Skip to content

Commit

Permalink
Merge pull request #26 from dgrigonis/collapse_externals_only
Browse files Browse the repository at this point in the history
possibility to collapse only external deps to packages
  • Loading branch information
mgedmin authored Feb 1, 2024
2 parents a8df40c + 257e5c5 commit 448c9ba
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 44 deletions.
173 changes: 134 additions & 39 deletions findimports.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
--duplicate)
-N, --noext omit external dependencies
-p, --packages convert the module graph to a package graph
-pE, --package-externals
convert external modules to a packages.
-l PACKAGELEVEL, --level PACKAGELEVEL
collapse subpackages to the topmost Nth levels. Only
used if --packages is given. Default: no limit
Expand All @@ -43,7 +45,12 @@
-I FILE, --ignore FILE
ignore a file or directory; this option can be used
multiple times. Default: ['venv']
-R PREFIX [PREFIX ...], --rmprefix PREFIX [PREFIX ...]
remove PREFIX from displayed node names. This
operation is applied last. Names that collapses to
nothing are removed.
-D MAX_DEPTH, --depth MAX_DEPTH
import depth in ast tree. Default: no limit
FindImports requires Python 3.6 or later.
Expand Down Expand Up @@ -242,7 +249,29 @@ def __repr__(self):
)


class ImportFinder(ast.NodeVisitor):
class DepthVisitor:
def __init__(self, max_depth=None):
self.max_depth = max_depth

def visit(self, node, depth=0):
"""Visit a node."""
method = f'visit_{node.__class__.__name__}'
visitor = getattr(self, method, self.generic_visit)
return visitor(node, depth)

def generic_visit(self, node, depth):
"""Called if no explicit visitor function exists for a node."""
if self.max_depth is None or depth < self.max_depth:
for field, value in ast.iter_fields(node):
if isinstance(value, list):
for item in value:
if isinstance(item, ast.AST):
self.visit(item, depth + 1)
elif isinstance(value, ast.AST):
self.visit(value, depth + 1)


class ImportFinder(DepthVisitor):
"""AST visitor that collects all imported names in its imports attribute.
For example, the following import statements in the AST tree
Expand All @@ -265,9 +294,10 @@ class ImportFinder(ast.NodeVisitor):

lineno_offset = 0 # needed when recursively parsing docstrings

def __init__(self, filename):
def __init__(self, filename, max_depth=None):
self.imports = []
self.filename = filename
super().__init__(max_depth)

def processImport(self, name, imported_as, full_name, level, node):
lineno = adjust_lineno(self.filename,
Expand All @@ -276,12 +306,12 @@ def processImport(self, name, imported_as, full_name, level, node):
info = ImportInfo(full_name, self.filename, lineno, level)
self.imports.append(info)

def visit_Import(self, node):
def visit_Import(self, node, depth):
for alias in node.names:
self.processImport(alias.name, alias.asname, alias.name, None,
node)

def visit_ImportFrom(self, node):
def visit_ImportFrom(self, node, depth):
if node.module == '__future__':
return

Expand All @@ -291,17 +321,18 @@ def visit_ImportFrom(self, node):
fullname = f"{node.module}.{name}" if node.module else name
self.processImport(name, imported_as, fullname, node.level, node)

def visitSomethingWithADocstring(self, node):
def visitSomethingWithADocstring(self, node, depth):
# ClassDef and FunctionDef have a 'lineno' attribute, Module doesn't.
lineno = getattr(node, 'lineno', None)
self.processDocstring(ast.get_docstring(node, clean=False), lineno)
self.generic_visit(node)
docstring = ast.get_docstring(node, clean=False)
self.processDocstring(docstring, lineno, depth)
self.generic_visit(node, depth)

visit_Module = visitSomethingWithADocstring
visit_ClassDef = visitSomethingWithADocstring
visit_FunctionDef = visitSomethingWithADocstring

def processDocstring(self, docstring, lineno):
def processDocstring(self, docstring, lineno, depth):
if not docstring:
return
if lineno is None:
Expand All @@ -325,7 +356,7 @@ def processDocstring(self, docstring, lineno):
filename=self.filename, lineno=lineno), file=sys.stderr)
else:
self.lineno_offset += lineno + example.lineno
self.visit(node)
self.visit(node, depth)
self.lineno_offset -= lineno + example.lineno


Expand Down Expand Up @@ -367,8 +398,8 @@ class ImportFinderAndNameTracker(ImportFinder):
warn_about_duplicates = False
verbose = False

def __init__(self, filename):
ImportFinder.__init__(self, filename)
def __init__(self, filename, max_depth=None):
ImportFinder.__init__(self, filename, max_depth)
self.scope = self.top_level = Scope(name=filename)
self.scope_stack = []
self.unused_names = []
Expand All @@ -388,14 +419,14 @@ def leaveAllScopes(self):
self.unused_names += self.scope.unused_names.values()
self.unused_names.sort(key=attrgetter('lineno'))

def processDocstring(self, docstring, lineno):
def processDocstring(self, docstring, lineno, depth):
self.newScope(self.top_level, 'docstring')
ImportFinder.processDocstring(self, docstring, lineno)
ImportFinder.processDocstring(self, docstring, lineno, depth)
self.leaveScope()

def visit_FunctionDef(self, node):
def visit_FunctionDef(self, node, depth):
self.newScope(self.scope, f"function {node.name}")
ImportFinder.visit_FunctionDef(self, node)
ImportFinder.visit_FunctionDef(self, node, depth)
self.leaveScope()

def processImport(self, name, imported_as, full_name, level, node):
Expand All @@ -420,10 +451,10 @@ def processImport(self, name, imported_as, full_name, level, node):
else:
self.scope.addImport(imported_as, self.filename, level, lineno)

def visit_Name(self, node):
def visit_Name(self, node, depth):
self.scope.useName(node.id)

def visit_Attribute(self, node):
def visit_Attribute(self, node, depth):
full_name = [node.attr]
parent = node.value
while isinstance(parent, ast.Attribute):
Expand All @@ -439,30 +470,30 @@ def visit_Attribute(self, node):
else:
name += part
self.scope.useName(name)
self.generic_visit(node)
self.generic_visit(node, depth)


def find_imports(filename):
def find_imports(filename, max_depth=None):
"""Find all imported names in a given file.
Returns a list of ImportInfo objects.
"""
with tokenize.open(filename) as f:
root = ast.parse(f.read(), filename)
visitor = ImportFinder(filename)
visitor = ImportFinder(filename, max_depth=max_depth)
visitor.visit(root)
return visitor.imports


def find_imports_and_track_names(filename, warn_about_duplicates=False,
verbose=False):
verbose=False, max_depth=None):
"""Find all imported names in a given file.
Returns ``(imports, unused)``. Both are lists of ImportInfo objects.
"""
with tokenize.open(filename) as f:
root = ast.parse(f.read(), filename)
visitor = ImportFinderAndNameTracker(filename)
visitor = ImportFinderAndNameTracker(filename, max_depth)
visitor.warn_about_duplicates = warn_about_duplicates
visitor.verbose = verbose
visitor.visit(root)
Expand Down Expand Up @@ -517,6 +548,7 @@ class ModuleGraph(object):
warn_about_duplicates = False
verbose = False
external_dependencies = True
max_depth = None

# some builtin modules do not exist as separate .so files on disk
builtin_modules = sys.builtin_module_names
Expand Down Expand Up @@ -592,18 +624,24 @@ def parseFile(self, filename, ignore_stdlib_modules):
module.imported_names, module.unused_names = (
find_imports_and_track_names(filename,
self.warn_about_duplicates,
self.verbose)
self.verbose,
self.max_depth)
)
else:
module.imported_names = find_imports(filename)
module.imported_names = find_imports(filename, self.max_depth)
module.unused_names = None
dir = os.path.dirname(filename)

if ignore_stdlib_modules:
module.imported_names = [
info for info in module.imported_names
if info.name.split('.')[0] not in STDLIB_MODNAMES_SET
]
module.imports = {
self.findModuleOfName(imp.name, imp.level, filename, dir)
for imp in module.imported_names}
# NOTE: Remove when certain that this is 100% dealt with above
if ignore_stdlib_modules:
module.imported_names = [info for info in module.imported_names
if info.name not in STDLIB_MODNAMES_SET]
module.imports -= STDLIB_MODNAMES_SET

def filenameToModname(self, filename):
Expand Down Expand Up @@ -729,6 +767,17 @@ def packageOf(self, dotted_name, packagelevel=None):
dotted_name = '.'.join(dotted_name.split('.')[:packagelevel])
return dotted_name

def isExternal(self, modname):
"""Package is external if not present in modules"""
return modname not in self.modules

def maybePackageOf(self, dotted_name,
packagelevel=None, externals_only=False):
"""Provides a flag to not convert internal modules to packages"""
if externals_only and not self.isExternal(dotted_name):
return dotted_name
return self.packageOf(dotted_name, packagelevel)

def removeTestPackage(self, dotted_name, pkgnames=['tests', 'ftests']):
"""Remove tests subpackages from dotted_name."""
result = []
Expand All @@ -746,23 +795,43 @@ def listModules(self):
modules.sort()
return [module for name, module in modules]

def packageGraph(self, packagelevel=None):
def packageGraph(self, packagelevel=None, externals_only=False):
"""Convert a module graph to a package graph."""
packages = {}
for module in self.listModules():
package_name = self.packageOf(module.modname, packagelevel)
package_name = self.maybePackageOf(
module.modname, packagelevel, externals_only)
if package_name not in packages:
dirname = os.path.dirname(module.filename)
packages[package_name] = Module(package_name, dirname)
package = packages[package_name]
for name in module.imports:
package_name = self.packageOf(name, packagelevel)
package_name = self.maybePackageOf(
name, packagelevel, externals_only)
if package_name != package.modname: # no loops
package.imports.add(package_name)
graph = ModuleGraph()
graph.modules = packages
return graph

def removePrefixes(self, prefixes):
"""Remove prefixes. Only applies 1st hit."""
prfx_union = '|'.join(map(re.escape, prefixes))
reg_cmp = re.compile(r'^(({})\.)?'.format(prfx_union))
packages = {}
for module in self.listModules():
new_modname = reg_cmp.sub('', module.modname)
if new_modname:
packages[new_modname] = Module(new_modname, module.filename)
for name in module.imports:
new_name = reg_cmp.sub('', name)
if new_name and new_name != new_modname: # no loops
packages[new_modname].imports.add(new_name)
graph = ModuleGraph()
packages = dict(sorted(packages.items(), key=lambda x: x[0]))
graph.modules = packages
return graph

def collapseTests(self, pkgnames=['tests', 'ftests']):
"""Collapse test packages with parent packages.
Expand Down Expand Up @@ -893,33 +962,40 @@ def printUnusedImports(self):
continue
print(f"{module.filename}:{lineno}: {name} not used")

def printDot(self):
def constructDot(self):
"""Produce a dependency graph in dot format."""
print("digraph ModuleDependencies {")
print(" node[shape=box];")
lines = list()
lines.append("digraph ModuleDependencies {")
lines.append(" node[shape=box];")
allNames = set()
nameDict = {}
for n, module in enumerate(self.listModules()):
module._dot_name = f"mod{n}"
nameDict[module.modname] = module._dot_name
print(f" {module._dot_name}[label=\"{quote(module.label)}\"];")
line = f" {module._dot_name}[label=\"{quote(module.label)}\"];"
lines.append(line)
allNames |= module.imports
print(" node[style=dotted];")
lines.append(" node[style=dotted];")
if self.external_dependencies:
myNames = set(self.modules)
extNames = list(allNames - myNames)
extNames.sort()
for n, name in enumerate(extNames):
nameDict[name] = id = f"extmod{n}"
print(f" {id}[label=\"{name}\"];")
lines.append(f" {id}[label=\"{name}\"];")
for modname, module in sorted(self.modules.items()):
for other in sorted(module.imports):
if other in nameDict:
print(" {0} -> {1};".format(
lines.append(" {0} -> {1};".format(
nameDict[module.modname],
nameDict[other]
))
print("}")
lines.append("}")
return '\n'.join(lines)

def printDot(self):
"""Print a dependency graph in dot format."""
print(self.constructDot())


def quote(s):
Expand Down Expand Up @@ -981,6 +1057,9 @@ def main(argv=None):
options.add_argument('-p', '--packages', action='store_true',
dest='condense_to_packages',
help='convert the module graph to a package graph')
options.add_argument('-pE', '--package-externals', action='store_true',
dest='condense_to_packages_externals',
help='convert external modules to a packages.')
options.add_argument('-l', '--level', type=int,
dest='packagelevel',
help='collapse subpackages to the topmost Nth levels.'
Expand All @@ -1001,12 +1080,22 @@ def main(argv=None):
help="ignore a file or directory;"
" this option can be used multiple times."
" Default: ['venv']")
options.add_argument('-R', '--rmprefix', metavar="PREFIX", nargs="+",
help="remove PREFIX from displayed node names. "
"This operation is applied last. "
"Names that collapses to nothing are removed.")
options.add_argument('-D', '--depth', type=int,
dest='max_depth',
help='import depth in ast tree. Default: no limit')
try:
args = parser.parse_args(args=argv[1:] if argv else None)
if args.condense_to_packages and args.condense_to_packages_externals:
parser.error('only one of -p and -pE can be provided')
except SystemExit as e:
return e.code

g = ModuleGraph()
g.max_depth = args.max_depth
g.all_unused = args.all_unused
g.warn_about_duplicates = args.warn_about_duplicates
g.verbose = args.verbose
Expand All @@ -1017,12 +1106,18 @@ def main(argv=None):
ignore_stdlib_modules=args.ignore_stdlib)
if args.write_cache:
g.writeCache(args.write_cache)

if args.condense_to_packages:
g = g.packageGraph(args.packagelevel)
g = g.packageGraph(args.packagelevel, externals_only=False)
elif args.condense_to_packages_externals:
g = g.packageGraph(args.packagelevel, externals_only=True)

if args.collapse_tests:
g = g.collapseTests()
if args.collapse_cycles:
g = g.collapseCycles()
if args.rmprefix is not None:
g = g.removePrefixes(args.rmprefix)
g.external_dependencies = not args.noext
getattr(g, args.action)()
return 0
Expand Down
Loading

0 comments on commit 448c9ba

Please sign in to comment.