Skip to content

Commit

Permalink
Factor file path related functions in Utils.py into their own path.py…
Browse files Browse the repository at this point in the history
… module.

The functions include binPath, varPath, zenPath, zenpathjoin, zenpathsplit, and isZenBinFile.
  • Loading branch information
jpeacock-zenoss committed Aug 3, 2023
1 parent db71217 commit 7a1efc3
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 255 deletions.
264 changes: 9 additions & 255 deletions Products/ZenUtils/Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@
setLogLevel,
setWebLoggingStream,
)
from .path import ( # noqa F401
binPath,
isZenBinFile,
varPath,
zenPath,
zenpathjoin,
zenpathsplit,
zopePath,
)
from .Threading import ( # noqa: F401
InterruptableThread,
LineReader,
Expand Down Expand Up @@ -665,44 +674,6 @@ def filter(obj):
return getSubObjectsMemo(base, filter=filter, descend=descend)


def zenpathsplit(pathstring):
"""Return the parts of a path with extraneous spaces removed.
>>> zenpathsplit('/zport/dmd/Devices')
['zport', 'dmd', 'Devices']
>>> zenpathsplit(' a /b / c')
['a', 'b', 'c']
@param pathstring: a path inside of ZENHOME
@type pathstring: string
@return: a path
@rtype: string
"""
path = pathstring.split("/")
path = filter(lambda x: x, path)
path = map(lambda x: x.strip(), path)
return path


def zenpathjoin(pathar):
"""Return a string that is the path formed from its parts.
The returned path is always an absolute path.
>>> zenpathjoin(('zport', 'dmd', 'Devices', 'Server'))
'/zport/dmd/Devices/Server'
>>> zenpathjoin(('', 'zport', 'dmd', 'Devices', 'Server'))
'/zport/dmd/Devices/Server'
@param pathar: a path
@type pathar: string
@return: a path
@rtype: string
"""
path = "/".join(pathar)
return path if path.startswith("/") else "/" + path


def createHierarchyObj(root, name, factory, relpath="", llog=None):
"""
Create a hierarchy object from its path we use relpath to skip down
Expand Down Expand Up @@ -1041,216 +1012,6 @@ def edgesToXML(edges, start=()):
return xmldoc


def _normalize_path(path):
"""Return the given path sans extraneous spaces and slashes.
Trailing slashes are removed.
>>> _normalize_path('a')
'a'
>>> _normalize_path('/a')
'/a'
>>> _normalize_path('/a/b/')
'/a/b'
>>> _normalize_path('a/b/')
'a/b'
>>> _normalize_path('a//b/')
'a/b'
>>> _normalize_path('//a//b/')
'/a/b'
>>> _normalize_path(' / a / b / ')
'/a/b'
>>> _normalize_path(' a / b / ')
'a/b'
>>> _normalize_path(' a / b ')
'a/b'
"""
# removes leading/trailing spaces from path parts and removes path parts
# that are empty strings.
parts = [p.strip() for p in path.split("/")]
# Never eliminate the first part
return "/".join(parts[0:1] + [p for p in parts[1:] if p])


def sane_pathjoin(base_path, *args):
"""Returns a path string constructed from the arguments.
The first argument ('base_path') is always the root part of the path.
This differs from os.path.join's behavior of discarding earlier path
parts if later path parts have a leading slash.
The base_path and *args are two paths to be joined. If the left-most
parts of *args matches base_path, only the parts after the match are
used in the resulting path.
>>> sane_pathjoin('a')
'a'
>>> sane_pathjoin('/a')
'/a'
>>> sane_pathjoin('/a', 'b', 'c')
'/a/b/c'
>>> sane_pathjoin('/a', '/b', '/c')
'/a/b/c'
>>> sane_pathjoin('/a', 'b', '/c')
'/a/b/c'
>>> sane_pathjoin('a', 'b', 'c')
'a/b/c'
>>> sane_pathjoin('a', '/b', '/c')
'a/b/c'
>>> sane_pathjoin('a', 'b', '/c')
'a/b/c'
>>> sane_pathjoin('a', '')
'a'
>>> sane_pathjoin('a', '', 'b')
'a/b'
>>> sane_pathjoin('/a ', ' b ', '/ c', '/d ')
'/a/b/c/d'
>>> sane_pathjoin('/a/b', '/a/b', 'c')
'/a/b/c'
>>> sane_pathjoin('/a/b', 'a', 'b', 'c')
'/a/b/c'
>>> sane_pathjoin('a/b', '/a/b', 'c')
'a/b/c'
>>> sane_pathjoin('a/b', 'a', 'b', 'c')
'a/b/c'
@param base_path: Base path to assume everything is rooted from.
@type base_path: string
@param *args: Path parts that follow base_path.
@type *args: Sequence of strings
@return: sanitized path
@rtype: string
"""
root = _normalize_path(base_path)
subpath = _normalize_path("/".join(args))
if subpath:
# subpath should always be a relative path.
if subpath[0] == "/":
subpath = subpath[1:]
# Get a relative path from the root path.
relbase = root[1:] if root[0:1] == "/" else root
if relbase and subpath.startswith(relbase):
subpath = subpath[len(relbase) + 1 :]
return "/".join((root, subpath))
return root


def varPath(*args):
"""Return a path relative to /var/zenoss specified by joining args.
The path is not guaranteed to exist on the filesystem.
"""
return sane_pathjoin("/var/zenoss", *args)


def zenPath(*args):
"""Return a path relative to $ZENHOME specified by joining args.
The path is not guaranteed to exist on the filesystem.
>>> import os
>>> zenHome = os.environ['ZENHOME']
>>> zenPath() == zenHome
True
>>> zenPath( '' ) == zenHome
True
>>> zenPath('Products') == os.path.join(zenHome, 'Products')
True
>>> zenPath('/Products/') == zenPath('Products')
True
>>>
>>> zenPath('Products', 'foo') == zenPath('Products/foo')
True
# NB: The following is *NOT* true for os.path.join()
>>> zenPath('/Products', '/foo') == zenPath('Products/foo')
True
>>> zenPath(zenPath('Products')) == zenPath('Products')
True
>>> zenPath(zenPath('Products'), 'orange', 'blue' ) \
== zenPath('Products', 'orange', 'blue' )
True
# Pathological case
# NB: need to expand out the array returned by split()
>>> zenPath() == zenPath( *'/'.split(zenPath()) )
True
@param *args: path components starting from $ZENHOME
@type *args: strings
@todo: determine what the correct behaviour should be if $ZENHOME
is a symlink!
"""
zenhome = os.environ.get("ZENHOME", "")

path = sane_pathjoin(zenhome, *args)

# test if ZENHOME based path exists and if not try bitrock-style path.
# if neither exists return the ZENHOME-based path
if not os.path.exists(path):
brPath = os.path.realpath(os.path.join(zenhome, "..", "common"))
testPath = sane_pathjoin(brPath, *args)
if os.path.exists(testPath):
path = testPath
return path


def zopePath(*args):
"""
Similar to zenPath() except that this constructs a path based on
ZOPEHOME rather than ZENHOME. This is useful on the appliance.
If ZOPEHOME is not defined or is empty then return ''.
NOTE: A non-empty return value does not guarantee that the path exists,
just that ZOPEHOME is defined.
>>> import os
>>> zopeHome = os.environ.setdefault('ZOPEHOME', '/something')
>>> zopePath('bin') == os.path.join(zopeHome, 'bin')
True
>>> zopePath(zopePath('bin')) == zopePath('bin')
True
@param *args: path components starting from $ZOPEHOME
@type *args: strings
"""
zopehome = os.environ.get("ZOPEHOME", "")
return sane_pathjoin(zopehome, *args)


def binPath(fileName):
"""
Search for the given file in a list of possible locations. Return
either the full path to the file or '' if the file was not found.
>>> len(binPath('zenoss')) > 0
True
>>> len(binPath('zeoup.py')) > 0 # This doesn't exist in Zope 2.12
False
>>> binPath('Idontexistreally') == ''
True
@param fileName: name of executable
@type fileName: string
@return: path to file or '' if not found
@rtype: string
"""
# bin and libexec are the usual suspect locations
paths = [zenPath(d, fileName) for d in ("bin", "libexec")]
# $ZOPEHOME/bin is an additional option for appliance
paths.append(zopePath("bin", fileName))
# Also check the standard locations for Nagios plugins
# (/usr/lib(64)/nagios/plugins)
paths.extend(
sane_pathjoin(d, fileName)
for d in ("/usr/lib/nagios/plugins", "/usr/lib64/nagios/plugins")
)

for path in paths:
if os.path.isfile(path):
return path
return ""


def extractPostContent(REQUEST):
"""
IE puts the POST content in one place in the REQUEST object, and Firefox in
Expand Down Expand Up @@ -2087,13 +1848,6 @@ def callWithShutdown(func, *args, **kwargs):
return callWithShutdown


def isZenBinFile(name):
"""Check if given name is a valid file in $ZENHOME/bin."""
if os.path.sep in name:
return False
return os.path.isfile(binPath(name))


def wait(seconds):
"""Delays execution of subsequent code.
Expand Down
Loading

0 comments on commit 7a1efc3

Please sign in to comment.