From 6a1be337f6666191c9391d166e33259adca24e14 Mon Sep 17 00:00:00 2001 From: Jason Peacock Date: Fri, 16 Jun 2023 16:23:13 -0500 Subject: [PATCH] Factor file path related functions in Utils.py into their own path.py module. The functions include binPath, varPath, zenPath, zenpathjoin, zenpathsplit, and isZenBinFile. --- Products/ZenUtils/Utils.py | 264 ++--------------------------------- Products/ZenUtils/path.py | 277 +++++++++++++++++++++++++++++++++++++ 2 files changed, 286 insertions(+), 255 deletions(-) create mode 100644 Products/ZenUtils/path.py diff --git a/Products/ZenUtils/Utils.py b/Products/ZenUtils/Utils.py index e9380bec14..fd2d912613 100644 --- a/Products/ZenUtils/Utils.py +++ b/Products/ZenUtils/Utils.py @@ -63,6 +63,15 @@ setLogLevel, setWebLoggingStream, ) +from .path import ( # noqa F401 + binPath, + isZenBinFile, + varPath, + zenPath, + zenpathjoin, + zenpathsplit, + zopePath, +) from .Threading import ( # noqa: F401 InterruptableThread, LineReader, @@ -665,44 +674,6 @@ def filter(obj): return getSubObjectsMemo(base, filter=filter, descend=descend) -def zenpathsplit(pathstring): - """Return the parts of a path with extraneous spaces removed. - - >>> zenpathsplit('/zport/dmd/Devices') - ['zport', 'dmd', 'Devices'] - >>> zenpathsplit(' a /b / c') - ['a', 'b', 'c'] - - @param pathstring: a path inside of ZENHOME - @type pathstring: string - @return: a path - @rtype: string - """ - path = pathstring.split("/") - path = filter(lambda x: x, path) - path = map(lambda x: x.strip(), path) - return path - - -def zenpathjoin(pathar): - """Return a string that is the path formed from its parts. - - The returned path is always an absolute path. - - >>> zenpathjoin(('zport', 'dmd', 'Devices', 'Server')) - '/zport/dmd/Devices/Server' - >>> zenpathjoin(('', 'zport', 'dmd', 'Devices', 'Server')) - '/zport/dmd/Devices/Server' - - @param pathar: a path - @type pathar: string - @return: a path - @rtype: string - """ - path = "/".join(pathar) - return path if path.startswith("/") else "/" + path - - def createHierarchyObj(root, name, factory, relpath="", llog=None): """ Create a hierarchy object from its path we use relpath to skip down @@ -1041,216 +1012,6 @@ def edgesToXML(edges, start=()): return xmldoc -def _normalize_path(path): - """Return the given path sans extraneous spaces and slashes. - - Trailing slashes are removed. - - >>> _normalize_path('a') - 'a' - >>> _normalize_path('/a') - '/a' - >>> _normalize_path('/a/b/') - '/a/b' - >>> _normalize_path('a/b/') - 'a/b' - >>> _normalize_path('a//b/') - 'a/b' - >>> _normalize_path('//a//b/') - '/a/b' - >>> _normalize_path(' / a / b / ') - '/a/b' - >>> _normalize_path(' a / b / ') - 'a/b' - >>> _normalize_path(' a / b ') - 'a/b' - """ - # removes leading/trailing spaces from path parts and removes path parts - # that are empty strings. - parts = [p.strip() for p in path.split("/")] - # Never eliminate the first part - return "/".join(parts[0:1] + [p for p in parts[1:] if p]) - - -def sane_pathjoin(base_path, *args): - """Returns a path string constructed from the arguments. - - The first argument ('base_path') is always the root part of the path. - This differs from os.path.join's behavior of discarding earlier path - parts if later path parts have a leading slash. - - The base_path and *args are two paths to be joined. If the left-most - parts of *args matches base_path, only the parts after the match are - used in the resulting path. - - >>> sane_pathjoin('a') - 'a' - >>> sane_pathjoin('/a') - '/a' - >>> sane_pathjoin('/a', 'b', 'c') - '/a/b/c' - >>> sane_pathjoin('/a', '/b', '/c') - '/a/b/c' - >>> sane_pathjoin('/a', 'b', '/c') - '/a/b/c' - >>> sane_pathjoin('a', 'b', 'c') - 'a/b/c' - >>> sane_pathjoin('a', '/b', '/c') - 'a/b/c' - >>> sane_pathjoin('a', 'b', '/c') - 'a/b/c' - >>> sane_pathjoin('a', '') - 'a' - >>> sane_pathjoin('a', '', 'b') - 'a/b' - >>> sane_pathjoin('/a ', ' b ', '/ c', '/d ') - '/a/b/c/d' - >>> sane_pathjoin('/a/b', '/a/b', 'c') - '/a/b/c' - >>> sane_pathjoin('/a/b', 'a', 'b', 'c') - '/a/b/c' - >>> sane_pathjoin('a/b', '/a/b', 'c') - 'a/b/c' - >>> sane_pathjoin('a/b', 'a', 'b', 'c') - 'a/b/c' - - @param base_path: Base path to assume everything is rooted from. - @type base_path: string - @param *args: Path parts that follow base_path. - @type *args: Sequence of strings - @return: sanitized path - @rtype: string - """ - root = _normalize_path(base_path) - subpath = _normalize_path("/".join(args)) - if subpath: - # subpath should always be a relative path. - if subpath[0] == "/": - subpath = subpath[1:] - # Get a relative path from the root path. - relbase = root[1:] if root[0:1] == "/" else root - if relbase and subpath.startswith(relbase): - subpath = subpath[len(relbase) + 1 :] - return "/".join((root, subpath)) - return root - - -def varPath(*args): - """Return a path relative to /var/zenoss specified by joining args. - - The path is not guaranteed to exist on the filesystem. - """ - return sane_pathjoin("/var/zenoss", *args) - - -def zenPath(*args): - """Return a path relative to $ZENHOME specified by joining args. - - The path is not guaranteed to exist on the filesystem. - - >>> import os - >>> zenHome = os.environ['ZENHOME'] - >>> zenPath() == zenHome - True - >>> zenPath( '' ) == zenHome - True - >>> zenPath('Products') == os.path.join(zenHome, 'Products') - True - >>> zenPath('/Products/') == zenPath('Products') - True - >>> - >>> zenPath('Products', 'foo') == zenPath('Products/foo') - True - - # NB: The following is *NOT* true for os.path.join() - >>> zenPath('/Products', '/foo') == zenPath('Products/foo') - True - >>> zenPath(zenPath('Products')) == zenPath('Products') - True - >>> zenPath(zenPath('Products'), 'orange', 'blue' ) \ - == zenPath('Products', 'orange', 'blue' ) - True - - # Pathological case - # NB: need to expand out the array returned by split() - >>> zenPath() == zenPath( *'/'.split(zenPath()) ) - True - - @param *args: path components starting from $ZENHOME - @type *args: strings - @todo: determine what the correct behaviour should be if $ZENHOME - is a symlink! - """ - zenhome = os.environ.get("ZENHOME", "") - - path = sane_pathjoin(zenhome, *args) - - # test if ZENHOME based path exists and if not try bitrock-style path. - # if neither exists return the ZENHOME-based path - if not os.path.exists(path): - brPath = os.path.realpath(os.path.join(zenhome, "..", "common")) - testPath = sane_pathjoin(brPath, *args) - if os.path.exists(testPath): - path = testPath - return path - - -def zopePath(*args): - """ - Similar to zenPath() except that this constructs a path based on - ZOPEHOME rather than ZENHOME. This is useful on the appliance. - If ZOPEHOME is not defined or is empty then return ''. - NOTE: A non-empty return value does not guarantee that the path exists, - just that ZOPEHOME is defined. - - >>> import os - >>> zopeHome = os.environ.setdefault('ZOPEHOME', '/something') - >>> zopePath('bin') == os.path.join(zopeHome, 'bin') - True - >>> zopePath(zopePath('bin')) == zopePath('bin') - True - - @param *args: path components starting from $ZOPEHOME - @type *args: strings - """ - zopehome = os.environ.get("ZOPEHOME", "") - return sane_pathjoin(zopehome, *args) - - -def binPath(fileName): - """ - Search for the given file in a list of possible locations. Return - either the full path to the file or '' if the file was not found. - - >>> len(binPath('zenoss')) > 0 - True - >>> len(binPath('zeoup.py')) > 0 # This doesn't exist in Zope 2.12 - False - >>> binPath('Idontexistreally') == '' - True - - @param fileName: name of executable - @type fileName: string - @return: path to file or '' if not found - @rtype: string - """ - # bin and libexec are the usual suspect locations - paths = [zenPath(d, fileName) for d in ("bin", "libexec")] - # $ZOPEHOME/bin is an additional option for appliance - paths.append(zopePath("bin", fileName)) - # Also check the standard locations for Nagios plugins - # (/usr/lib(64)/nagios/plugins) - paths.extend( - sane_pathjoin(d, fileName) - for d in ("/usr/lib/nagios/plugins", "/usr/lib64/nagios/plugins") - ) - - for path in paths: - if os.path.isfile(path): - return path - return "" - - def extractPostContent(REQUEST): """ IE puts the POST content in one place in the REQUEST object, and Firefox in @@ -2087,13 +1848,6 @@ def callWithShutdown(func, *args, **kwargs): return callWithShutdown -def isZenBinFile(name): - """Check if given name is a valid file in $ZENHOME/bin.""" - if os.path.sep in name: - return False - return os.path.isfile(binPath(name)) - - def wait(seconds): """Delays execution of subsequent code. diff --git a/Products/ZenUtils/path.py b/Products/ZenUtils/path.py new file mode 100644 index 0000000000..0b214fc563 --- /dev/null +++ b/Products/ZenUtils/path.py @@ -0,0 +1,277 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2023 all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import + +import os + +from pathlib2 import PurePath, Path + +__all__ = ( + "binPath", + "isZenBinFile", + "varPath", + "zenPath", + "zenpathjoin", + "zenpathsplit", + "zopePath", +) + + +def zenpathsplit(pathstring): + """Returns a path string without any spaces in its parts. + + >>> zenpathsplit('/zport/dmd/Devices') + ['zport', 'dmd', 'Devices'] + >>> zenpathsplit(' a /b / c') + ['a', 'b', 'c'] + >>> zenpathsplit('') + [] + >>> zenpathsplit('/') + [] + >>> zenpathsplit('//') + ['//'] + >>> zenpathsplit('///') + [] + + @param pathstring: a path inside of ZENHOME + @type pathstring: string + @return: a path + @rtype: string + """ + path = _normalize_path(pathstring) + if not path.parts: + return [] + return list(path.parts if path.parts[0] != "/" else path.parts[1:]) + + +def zenpathjoin(path): + """Returns a joined path in its string form. + + >>> zenpathjoin(('zport', 'dmd', 'Devices', 'Server')) + '/zport/dmd/Devices/Server' + >>> zenpathjoin(('', 'zport', 'dmd', 'Devices', 'Server')) + '/zport/dmd/Devices/Server' + >>> zenpathjoin('opt') + '/opt' + >>> zenpathjoin(['opt', 'a', 'b']) + '/opt/a/b' + >>> zenpathjoin(['', 'opt', 'a', 'b']) + '/opt/a/b' + >>> zenpathjoin(['', 'opt', '', 'a', 'b']) + '/opt/a/b' + + @param path: One or more parts of a path. + @type path: string or sequence + @return: a path + @rtype: string + """ + parts = [path] if isinstance(path, basestring) else path + return str(PurePath("/").joinpath(*parts)) + + +def varPath(*args): + """Return a path relative to /var/zenoss specified by joining args. + + The path is not guaranteed to exist on the filesystem. + """ + return _pathjoin("/var/zenoss", *args) + + +def zenPath(*args): + """Return a path relative to $ZENHOME specified by joining args. + + The path is not guaranteed to exist on the filesystem. + + >>> import os + >>> zenHome = os.environ['ZENHOME'] + >>> zenPath() == zenHome + True + >>> zenPath('') == zenHome + True + >>> zenPath('Products') == os.path.join(zenHome, 'Products') + True + >>> zenPath('/Products/') == zenPath('Products') + True + >>> + >>> zenPath('Products', 'foo') == zenPath('Products/foo') + True + + # NB: The following is *NOT* true for os.path.join() + >>> zenPath('/Products', '/foo') == zenPath('Products/foo') + True + >>> products = zenPath('Products') + >>> zenPath(products) == products + True + >>> product_colors = zenPath('Products', 'orange', 'blue') + >>> zenPath(zenPath('Products'), 'orange', 'blue') == product_colors + True + + # Pathological case + # NB: need to expand out the array returned by split() + >>> zenPath() == zenPath( *'/'.split(zenPath()) ) + True + + @param *args: path components starting from $ZENHOME + @type *args: strings + @todo: determine what the correct behaviour should be if $ZENHOME is a + symlink! + """ + zenhome = os.environ.get("ZENHOME", "") + return _pathjoin(zenhome, *args) + + +def zopePath(*args): + """ + Similar to zenPath() except that this constructs a path based on + ZOPEHOME rather than ZENHOME. This is useful on the appliance. + If ZOPEHOME is not defined or is empty then return ''. + NOTE: A non-empty return value does not guarantee that the path exists, + just that ZOPEHOME is defined. + + >>> import os + >>> zopeHome = os.environ.setdefault('ZOPEHOME', '/something') + >>> zopePath('bin') == os.path.join(zopeHome, 'bin') + True + >>> zopePath(zopePath('bin')) == zopePath('bin') + True + + @param *args: path components starting from $ZOPEHOME + @type *args: strings + """ + zopehome = os.environ.get("ZOPEHOME", "") + return _pathjoin(zopehome, *args) + + +def binPath(filename): + """ + Search for the given file in a list of possible locations. Return + either the full path to the file or '' if the file was not found. + + >>> len(binPath('zenoss')) > 0 + True + >>> len(binPath('zeoup.py')) > 0 # This doesn't exist in Zope 2.12 + False + >>> binPath('Idontexistreally') == '' + True + + @param fileName: name of executable + @type fileName: string + @return: path to file or '' if not found + @rtype: string + """ + paths = ( + Path(zenPath("bin", filename)), + Path(zenPath("libexec", filename)), + Path(zopePath("bin", filename)), + Path(_pathjoin("/usr/lib/nagios/plugins", filename)), + Path(_pathjoin("/usr/lib64/nagios/plugins", filename)), + ) + return str(next((p for p in paths if p.is_file()), "")) + + +def isZenBinFile(name): + """Check if given name is a valid file in $ZENHOME/bin.""" + return Path(binPath(name)).is_file() + + +def _normalize_path(pathstring): + """Return a path string with no spaces surrounding each path part. + + Trailing slash is also removed. + + >>> _normalize_path('/opt/') + PurePosixPath('/opt') + >>> _normalize_path('/ a/b / c /d') + PurePosixPath('/a/b/c/d') + >>> _normalize_path('opt/') + PurePosixPath('opt') + >>> _normalize_path(' a/b / c /d') + PurePosixPath('a/b/c/d') + """ + return PurePath(*(p.strip() for p in PurePath(pathstring).parts)) + + +def _pathjoin(basepath, *parts): + """Returns a path string constructed from the arguments. + + The first argument ('base_path') is always the root part of the path. + This differs from os.path.join's behavior of discarding earlier path + parts if later path parts have a leading slash. + + The base_path and *args are two paths to be joined. If the left-most + parts of *args matches base_path, only the parts after the match are + used in the resulting path. + + >>> _pathjoin('/opt/') + '/opt' + >>> _pathjoin('/opt', '/a/', 'b/') + '/opt/a/b' + >>> _pathjoin('/opt', _pathjoin('/opt', 'a', 'b')) + '/opt/a/b' + >>> _pathjoin('/opt/a', '/opt/a/b') + '/opt/a/b' + >>> _pathjoin('/opt/a', 'opt/a/b') + '/opt/a/b' + >>> _pathjoin('/opt/a', '/opt/b/c') + '/opt/a/opt/b/c' + >>> _pathjoin('/bin', '/opt/bin/c') + '/bin/opt/bin/c' + >>> _pathjoin('bin', 'opt/bin/c') + 'bin/opt/bin/c' + >>> _pathjoin('bin', '/opt/bin/c') + 'bin/opt/bin/c' + >>> _pathjoin('bin', 'bin/c') + 'bin/c' + >>> _pathjoin('a') + 'a' + >>> _pathjoin('/a') + '/a' + >>> _pathjoin('/a', 'b', 'c') + '/a/b/c' + >>> _pathjoin('/a', '/b', '/c') + '/a/b/c' + >>> _pathjoin('/a', 'b', '/c') + '/a/b/c' + >>> _pathjoin('a', 'b', 'c') + 'a/b/c' + >>> _pathjoin('a', '/b', '/c') + 'a/b/c' + >>> _pathjoin('a', 'b', '/c') + 'a/b/c' + >>> _pathjoin('a', '') + 'a' + >>> _pathjoin('a', '', 'b') + 'a/b' + >>> _pathjoin('/a ', ' b ', '/ c', '/d ') + '/a/b/c/d' + >>> _pathjoin('/a/b', '/a/b', 'c') + '/a/b/c' + >>> _pathjoin('/a/b', '/a/', 'b/', 'c') + '/a/b/c' + >>> _pathjoin('a/b', '/a/b', 'c') + 'a/b/c' + >>> _pathjoin('a/b', 'a', 'b', 'c') + 'a/b/c' + + @param base_path: Base path to assume everything is rooted from. + @type base_path: string + @param *args: Path parts that follow base_path. + @type *args: Sequence of strings + @return: sanitized path + @rtype: string + """ + base = _normalize_path(basepath) + if not parts: + return str(base) + subpath = PurePath(*(p.strip("/").strip() for p in parts)) + relbase = PurePath(*base.parts[1:]) if base.is_absolute() else base + if relbase in subpath.parents: + subpath = subpath.relative_to(relbase) + return str(base.joinpath(subpath))