commit: ff280478d9be3b3abac919c4e553270a91c2d341
Author: Brian Harring <ferringb <AT> gmail <DOT> com>
AuthorDate: Sun Nov 30 18:00:56 2025 +0000
Commit: Brian Harring <ferringb <AT> gmail <DOT> com>
CommitDate: Sun Nov 30 20:01:45 2025 +0000
URL:
https://gitweb.gentoo.org/proj/pkgcore/snakeoil.git/commit/?id=ff280478
chore: find_unused_exports basically fully works now
This won't pick up getattr usage or importlib, nor
commutative assigns of an import variable, but
it catches the rest.
Signed-off-by: Brian Harring <ferringb <AT> gmail.com>
src/snakeoil/tools/find_unused_exports.py | 275 +++++++++++++++++++++++-------
1 file changed, 217 insertions(+), 58 deletions(-)
diff --git a/src/snakeoil/tools/find_unused_exports.py
b/src/snakeoil/tools/find_unused_exports.py
index 36addae..7a6f43d 100644
--- a/src/snakeoil/tools/find_unused_exports.py
+++ b/src/snakeoil/tools/find_unused_exports.py
@@ -7,9 +7,11 @@ import argparse
import ast
import logging
import sys
+from collections import defaultdict
+from importlib import import_module
from pathlib import Path
from textwrap import dedent
-from typing import Self, cast
+from typing import NamedTuple, Optional, Self, cast
from snakeoil.python_namespaces import get_submodules_of
@@ -20,23 +22,32 @@ from snakeoil.python_namespaces import get_submodules_of
logger = logging.getLogger(__name__)
+class CtxAccess(NamedTuple):
+ attr: str
+ module: "ModuleImport"
+
+
# This classes are effectively a tree that can be walked backwards as
# we recurse into the import pathways where they reference back down the
pathways.
# It is cyclic as all hell.
-class ModuleImport(ast.NodeVisitor, dict[str, "ModuleImport"]):
- __slots__ = ("root", "parent", "name", "accesses", "unscoped_access",
"ctx_imports")
-
+class ModuleImport(dict[str, "ModuleImport"]):
def __init__(self, root: Self | None, parent: Self | None, name: str) ->
None:
- if name == "pkgcore.vdb.repo_ops":
- import pdb
-
- pdb.set_trace()
self.root = self if root is None else root # oh yeah, cyclic baby.
self.parent = self.root if parent is None else parent
self.name = name
- self.accesses: set[str] = set()
- self.unscoped_access: set[str] = set()
- self.ctx_imports = dict[str, Self]()
+ # this is recordings of other modules accessing us.
+ self.accessed_by: dict[str, set["ModuleImport"]] = defaultdict(set)
+ # This is a mapping of the local name to the target namespace
+ self.ctx_imports = dict[str, CtxAccess]()
+ self.unscoped_accessers: set[str] = set()
+ self.requires_reprocessing = False
+ self.alls = None
+
+ def __hash__(self) -> int: # type: ignore
+ return hash(self.qualname)
+
+ def __eq__(self, other):
+ return self is other
@property
def qualname(self):
@@ -47,28 +58,41 @@ class ModuleImport(ast.NodeVisitor, dict[str,
"ModuleImport"]):
current = current.parent
return ".".join(reversed(l))
- def __missing__(self, name: str) -> "ModuleImport":
- assert "." not in name
- self[name] = obj = self.__class__(self.root, parent=self, name=name)
+ def create(self, chunks: list[str]) -> "ModuleImport":
+ assert len(chunks)
+ name, chunks = chunks[0], chunks[1:]
+ obj = self.setdefault(name, self.__class__(self.root, parent=self,
name=name))
+ if chunks:
+ return obj.create(chunks)
return obj
- def resolve_import(self, name: str) -> "ModuleImport":
+ def resolve_import(
+ self,
+ name: str,
+ requester: Optional["ModuleImport"],
+ ) -> tuple[list[str], "ModuleImport"]:
parts = name.split(".")
+ assert all(parts)
+ current = self
- current = self if parts[0] == "" else self.root
- while parts and parts[0] == "":
- if current is self.root:
- raise Exception(
- f"in {self.qualname}, an import tried to climb past root:
{name}"
- )
- current = current.parent
+ while parts:
+ if requester is not None:
+ current.accessed_by[parts[0]].add(requester)
+ if parts[0] not in current:
+ break
+ current = current[parts[0]]
parts = parts[1:]
- for part in parts:
- current = current[part]
- return current
+
+ try:
+ assert parts or self.root is not current
+ except AssertionError as _e:
+ # structured this way to make debugging easier
+ raise
+
+ return (parts, current)
def __str__(self) -> str:
- return f"{self.qualname}: access={self.accesses!r}
unscoped={self.unscoped_access!r} known
ctx={list(sorted(self.ctx_imports.keys()))!r}"
+ return f"{self.qualname}: access={self.accessed_by!r}
unscoped={self.unscoped_accessers!r} known
ctx={list(sorted(self.ctx_imports.keys()))!r}"
def __repr__(self):
return str(self)
@@ -77,39 +101,133 @@ class ModuleImport(ast.NodeVisitor, dict[str,
"ModuleImport"]):
class ImportCollector(ast.NodeVisitor):
__slotting_intentionally_disabled__ = True
- def __init__(self, root: ModuleImport, name: str) -> None:
+ def __init__(
+ self, root: ModuleImport, current: ModuleImport, name: str, path: Path
+ ) -> None:
self.root = root
- self.current = self.root.resolve_import(name)
+ self.current = current
+ self.path = path
+ # from semantics are directory traversals, despite how they look.
__init__ is special.
+ self.level_adjustment = 1 if path.name.startswith("__init__.") else 0
+ self.requires_reprocessing = True
+
+ def visit(self, node):
+ # reset our status
+ self.current.requires_reprocessing = False
+ super().visit(node)
+
+ def get_asname(self, alias) -> str:
+ if alias.asname:
+ return alias.asname
+ return alias.name.split(
+ ".",
+ )[0]
+
+ def update_must_reprocess(self, asname: str):
+ assert "." not in asname, asname
+ for must_reprocess in self.current.accessed_by.pop(asname, []):
+ must_reprocess.requires_reprocessing = True
def visit_Import(self, node):
for alias in node.names:
- # rework this to look for getattrs
-
- result = self.current.resolve_import(alias.name)
- result.unscoped_access.add(self.current.name)
- self.current.ctx_imports[alias.asname if alias.asname else
alias.name] = (
- result
+ asname = self.get_asname(alias)
+ self.update_must_reprocess(asname)
+
+ attrs, result = self.root.resolve_import(alias.name,
requester=self.current)
+
+ if attrs:
+ # failed to fully import. Don't inject the result into ctx;
+ # the traversal to get there will notify of us of the rebuild
+ # if necessary. It's possible we're importing through a module
+ # that assembles an API via doing it's own internal imports.
+ continue
+ self.current.ctx_imports[asname] = CtxAccess(
+ alias.name,
+ result,
)
+ result.unscoped_accessers.add(self.current.qualname)
def visit_ImportFrom(self, node):
+ # just rewrite into absolute pathing
+ base: list[str]
+ if node.level:
+ base = self.current.qualname.split(".")
+ level = node.level - self.level_adjustment
+ if level:
+ base = base[:-level]
+ if node.module:
+ base.extend(node.module.split("."))
+ else:
+ base = node.module.split(".")
for alias in node.names:
- if node.module is None:
- continue # not touching that with a 20ft pole.
- result = self.current.resolve_import(node.module)
- result.accesses.add(alias.name)
- self.current.ctx_imports[alias.asname if alias.asname else
alias.name] = (
- result
+ asname = self.get_asname(alias)
+ self.update_must_reprocess(asname)
+ l = base[:]
+ l.append(alias.name)
+
+ attrs, result = self.root.resolve_import(
+ ".".join(l), requester=self.current
)
+ if attrs:
+ if len(attrs) == 1:
+ # `from module import some_func`
+ result.accessed_by[attrs[0]].add(self.current)
+ # lacking that, we couldn't import it fully.
+ continue
+
+ self.current.ctx_imports[asname] = CtxAccess(
+ alias.name,
+ result,
+ )
+
+
+class AttributeCollector(ast.NodeVisitor):
+ def __init__(self, root: ModuleImport, current: ModuleImport) -> None:
+ self.root = root
+ self.current = current
def visit_Attribute(self, node):
- if not hasattr(node.value, "id"):
+ if not isinstance(node.ctx, ast.Load):
return
+
+ lookup = [node.attr]
+ value = node.value
+ try:
+ while isinstance(value, ast.Name):
+ if (last := getattr(value, "id", None)) is not None:
+ # terminus. This node won't have attr.
+ lookup.append(last)
+ break
+ lookup.append(value.attr)
+ node = node.value
+
+ except Exception as e:
+ print(
+ f"ast traversal bug in {self.current.qualname} for original
{type(node)}={node} sub-value {type(value)}={value}"
+ )
+ import pdb
+
+ pdb.set_trace()
+ raise e
+
+ lookup.reverse()
+
# this isn't confirming there isn't shadowing-
# import os
# def foon(os): ... # just got shadowed, 'os' in that ctx is not
globals()['os']
# it takes effort, and it's not worth it; this tool is already known
loose.
- if (target := self.current.ctx_imports.get(node.value.id, None)) is
not None:
- target.accesses.add(node.attr)
+
+ if (target := self.current.ctx_imports.get(lookup[0], None)) is None:
+ # it's an attribute, or an import we don't care about.
+ return
+ # build an absolute path, use resolve machinery to sort this.
+ parts = target.module.qualname.split(".") + lookup[1:]
+ parts, mod = self.root.resolve_import(".".join(parts),
requester=self.current)
+ assert mod is not self.root
+
+ if parts:
+ # attribute access into that module.
+ mod.accessed_by[parts[0]].add(self.current)
parser = argparse.ArgumentParser(
@@ -140,42 +258,83 @@ parser.add_argument(
parser.add_argument(
"targets", type=str, nargs="+", help="python namespaces to scan for usage."
)
+parser.add_argument(
+ "-v", action="store_true", default=False, dest="verbose", help="Increase
verbosity"
+)
def main(options, out, err) -> int:
root = ModuleImport(None, None, "")
+
+ source_modules: list[ModuleImport] = []
+ ast_sources = {}
+ # pre-initialize the module tree of what we care about.
for target in tuple(options.targets) + (options.source,):
- for mod in get_submodules_of(__import__(target), include_root=True):
- p = cast(str, mod.__file__)
- with Path(p).open() as f:
- tree = ast.parse(f.read(), str(p))
- ImportCollector(root, target).visit(tree)
+ for module in get_submodules_of(import_module(target),
include_root=True):
+ obj = root.create(module.__name__.split("."))
+ obj.alls = getattr(module, "__all__", None)
+ p = Path(cast(str, module.__file__))
+ with p.open("r") as f:
+ ast_sources[obj] = (p, ast.parse(f.read(), str(p)))
+ if target == options.source:
+ source_modules.append(obj)
+
+ # collect and finalize imports, then run analysis based on attribute
access.
+
+ # Note: the import collection may need to run multiple times. Consider:
+ # klass.py:
+ # __all__ = ('blah', 'foon')
+ # from .other import blah, foon
+ #
+ # If some other module tries to travers klass.py before those from imports
have been placed, the
+ # other module will think it stopped at an attribute for 'blah'. Which
isn't correct.
+ # They internally detect this conflict and mark a boolean to indicate if a
reprocessing is needed.
+ must_be_processed = list(ast_sources)
+ for run in range(0, 10):
+ for mod in must_be_processed:
+ p, tree = ast_sources[mod]
+ ImportCollector(root, mod, mod.qualname, p).visit(tree)
+
+ if new_reprocess := [mod for mod in ast_sources if
mod.requires_reprocessing]:
+ if len(new_reprocess) == len(must_be_processed):
+ raise Exception("cycle encountered")
+ must_be_processed = new_reprocess
+ else:
+ break
+
+ for mod, (p, tree) in ast_sources.items():
+ AttributeCollector(root, mod).visit(tree)
- source_modules = list(get_submodules_of(__import__(options.source)))
results = []
for mod in source_modules:
- results.append(result := [mod.__name__])
- if (mod_alls := getattr(mod, "__all__", None)) is None:
- result.append(f"{mod.__name__} has no __all__. Not analyzing")
+ results.append(result := [mod.qualname])
+ if mod.alls is None:
+ result.append(f"{mod.qualname} has no __all__. Not analyzing")
continue
- collected = root.resolve_import(mod.__name__)
- missing = list(sorted(set(mod_alls).difference(collected.accesses)))
+ if options.verbose:
+ result.append("__all__ = (" + ", ".join(sorted(mod.alls)) + ")")
+
+ missing = list(sorted(set(mod.alls).difference(mod.accessed_by)))
if not missing:
continue
- result.append(f"all is {list(sorted(mod_alls))}")
- if collected.unscoped_access:
+ # result.append(f"all is {list(sorted(mod.alls))}")
+ if mod.unscoped_accessers:
result.append(
- f"unscoped access exists from {collected.unscoped_access!r}.
getattr() type isn't detectable current, results may be wrong"
+ f"unscoped access exists from {mod.unscoped_accessers!r}.
Results may be inaccurate"
)
result.append(f"possibly unused {missing}")
first = ""
for block in sorted(results, key=lambda l: l[0]):
- if len(block) == 1:
+ if len(block) == 1 and not options.verbose:
continue
out.write(f"{first}{block[0]}\n")
first = "\n"
+ if len(block) == 1:
+ out.write(" __all__ is fully used\n")
+ continue
+
for lines in block[1:]:
out.write(f" {lines}\n")