commit:     ff280478d9be3b3abac919c4e553270a91c2d341
Author:     Brian Harring <ferringb <AT> gmail <DOT> com>
AuthorDate: Sun Nov 30 18:00:56 2025 +0000
Commit:     Brian Harring <ferringb <AT> gmail <DOT> com>
CommitDate: Sun Nov 30 20:01:45 2025 +0000
URL:        
https://gitweb.gentoo.org/proj/pkgcore/snakeoil.git/commit/?id=ff280478

chore: find_unused_exports basically fully works now

This won't pick up getattr usage or importlib, nor
commutative assigns of an import variable, but
it catches the rest.

Signed-off-by: Brian Harring <ferringb <AT> gmail.com>

 src/snakeoil/tools/find_unused_exports.py | 275 +++++++++++++++++++++++-------
 1 file changed, 217 insertions(+), 58 deletions(-)

diff --git a/src/snakeoil/tools/find_unused_exports.py 
b/src/snakeoil/tools/find_unused_exports.py
index 36addae..7a6f43d 100644
--- a/src/snakeoil/tools/find_unused_exports.py
+++ b/src/snakeoil/tools/find_unused_exports.py
@@ -7,9 +7,11 @@ import argparse
 import ast
 import logging
 import sys
+from collections import defaultdict
+from importlib import import_module
 from pathlib import Path
 from textwrap import dedent
-from typing import Self, cast
+from typing import NamedTuple, Optional, Self, cast
 
 from snakeoil.python_namespaces import get_submodules_of
 
@@ -20,23 +22,32 @@ from snakeoil.python_namespaces import get_submodules_of
 logger = logging.getLogger(__name__)
 
 
+class CtxAccess(NamedTuple):
+    attr: str
+    module: "ModuleImport"
+
+
 # This classes are effectively a tree that can be walked backwards as
 # we recurse into the import pathways where they reference back down the 
pathways.
 # It is cyclic as all hell.
-class ModuleImport(ast.NodeVisitor, dict[str, "ModuleImport"]):
-    __slots__ = ("root", "parent", "name", "accesses", "unscoped_access", 
"ctx_imports")
-
+class ModuleImport(dict[str, "ModuleImport"]):
     def __init__(self, root: Self | None, parent: Self | None, name: str) -> 
None:
-        if name == "pkgcore.vdb.repo_ops":
-            import pdb
-
-            pdb.set_trace()
         self.root = self if root is None else root  # oh yeah, cyclic baby.
         self.parent = self.root if parent is None else parent
         self.name = name
-        self.accesses: set[str] = set()
-        self.unscoped_access: set[str] = set()
-        self.ctx_imports = dict[str, Self]()
+        # this is recordings of other modules accessing us.
+        self.accessed_by: dict[str, set["ModuleImport"]] = defaultdict(set)
+        # This is a mapping of the local name to the target namespace
+        self.ctx_imports = dict[str, CtxAccess]()
+        self.unscoped_accessers: set[str] = set()
+        self.requires_reprocessing = False
+        self.alls = None
+
+    def __hash__(self) -> int:  # type: ignore
+        return hash(self.qualname)
+
+    def __eq__(self, other):
+        return self is other
 
     @property
     def qualname(self):
@@ -47,28 +58,41 @@ class ModuleImport(ast.NodeVisitor, dict[str, 
"ModuleImport"]):
             current = current.parent
         return ".".join(reversed(l))
 
-    def __missing__(self, name: str) -> "ModuleImport":
-        assert "." not in name
-        self[name] = obj = self.__class__(self.root, parent=self, name=name)
+    def create(self, chunks: list[str]) -> "ModuleImport":
+        assert len(chunks)
+        name, chunks = chunks[0], chunks[1:]
+        obj = self.setdefault(name, self.__class__(self.root, parent=self, 
name=name))
+        if chunks:
+            return obj.create(chunks)
         return obj
 
-    def resolve_import(self, name: str) -> "ModuleImport":
+    def resolve_import(
+        self,
+        name: str,
+        requester: Optional["ModuleImport"],
+    ) -> tuple[list[str], "ModuleImport"]:
         parts = name.split(".")
+        assert all(parts)
+        current = self
 
-        current = self if parts[0] == "" else self.root
-        while parts and parts[0] == "":
-            if current is self.root:
-                raise Exception(
-                    f"in {self.qualname}, an import tried to climb past root: 
{name}"
-                )
-            current = current.parent
+        while parts:
+            if requester is not None:
+                current.accessed_by[parts[0]].add(requester)
+            if parts[0] not in current:
+                break
+            current = current[parts[0]]
             parts = parts[1:]
-        for part in parts:
-            current = current[part]
-        return current
+
+        try:
+            assert parts or self.root is not current
+        except AssertionError as _e:
+            # structured this way to make debugging easier
+            raise
+
+        return (parts, current)
 
     def __str__(self) -> str:
-        return f"{self.qualname}: access={self.accesses!r} 
unscoped={self.unscoped_access!r} known 
ctx={list(sorted(self.ctx_imports.keys()))!r}"
+        return f"{self.qualname}: access={self.accessed_by!r} 
unscoped={self.unscoped_accessers!r} known 
ctx={list(sorted(self.ctx_imports.keys()))!r}"
 
     def __repr__(self):
         return str(self)
@@ -77,39 +101,133 @@ class ModuleImport(ast.NodeVisitor, dict[str, 
"ModuleImport"]):
 class ImportCollector(ast.NodeVisitor):
     __slotting_intentionally_disabled__ = True
 
-    def __init__(self, root: ModuleImport, name: str) -> None:
+    def __init__(
+        self, root: ModuleImport, current: ModuleImport, name: str, path: Path
+    ) -> None:
         self.root = root
-        self.current = self.root.resolve_import(name)
+        self.current = current
+        self.path = path
+        # from semantics are directory traversals, despite how they look.  
__init__ is special.
+        self.level_adjustment = 1 if path.name.startswith("__init__.") else 0
+        self.requires_reprocessing = True
+
+    def visit(self, node):
+        # reset our status
+        self.current.requires_reprocessing = False
+        super().visit(node)
+
+    def get_asname(self, alias) -> str:
+        if alias.asname:
+            return alias.asname
+        return alias.name.split(
+            ".",
+        )[0]
+
+    def update_must_reprocess(self, asname: str):
+        assert "." not in asname, asname
+        for must_reprocess in self.current.accessed_by.pop(asname, []):
+            must_reprocess.requires_reprocessing = True
 
     def visit_Import(self, node):
         for alias in node.names:
-            # rework this to look for getattrs
-
-            result = self.current.resolve_import(alias.name)
-            result.unscoped_access.add(self.current.name)
-            self.current.ctx_imports[alias.asname if alias.asname else 
alias.name] = (
-                result
+            asname = self.get_asname(alias)
+            self.update_must_reprocess(asname)
+
+            attrs, result = self.root.resolve_import(alias.name, 
requester=self.current)
+
+            if attrs:
+                # failed to fully import.  Don't inject the result into ctx;
+                # the traversal to get there will notify of us of the rebuild
+                # if necessary.  It's possible we're importing through a module
+                # that assembles an API via doing it's own internal imports.
+                continue
+            self.current.ctx_imports[asname] = CtxAccess(
+                alias.name,
+                result,
             )
+            result.unscoped_accessers.add(self.current.qualname)
 
     def visit_ImportFrom(self, node):
+        # just rewrite into absolute pathing
+        base: list[str]
+        if node.level:
+            base = self.current.qualname.split(".")
+            level = node.level - self.level_adjustment
+            if level:
+                base = base[:-level]
+            if node.module:
+                base.extend(node.module.split("."))
+        else:
+            base = node.module.split(".")
         for alias in node.names:
-            if node.module is None:
-                continue  # not touching that with a 20ft pole.
-            result = self.current.resolve_import(node.module)
-            result.accesses.add(alias.name)
-            self.current.ctx_imports[alias.asname if alias.asname else 
alias.name] = (
-                result
+            asname = self.get_asname(alias)
+            self.update_must_reprocess(asname)
+            l = base[:]
+            l.append(alias.name)
+
+            attrs, result = self.root.resolve_import(
+                ".".join(l), requester=self.current
             )
+            if attrs:
+                if len(attrs) == 1:
+                    # `from module import some_func`
+                    result.accessed_by[attrs[0]].add(self.current)
+                # lacking that, we couldn't import it fully.
+                continue
+
+            self.current.ctx_imports[asname] = CtxAccess(
+                alias.name,
+                result,
+            )
+
+
+class AttributeCollector(ast.NodeVisitor):
+    def __init__(self, root: ModuleImport, current: ModuleImport) -> None:
+        self.root = root
+        self.current = current
 
     def visit_Attribute(self, node):
-        if not hasattr(node.value, "id"):
+        if not isinstance(node.ctx, ast.Load):
             return
+
+        lookup = [node.attr]
+        value = node.value
+        try:
+            while isinstance(value, ast.Name):
+                if (last := getattr(value, "id", None)) is not None:
+                    # terminus.  This node won't have attr.
+                    lookup.append(last)
+                    break
+                lookup.append(value.attr)
+                node = node.value
+
+        except Exception as e:
+            print(
+                f"ast traversal bug in {self.current.qualname} for original 
{type(node)}={node} sub-value {type(value)}={value}"
+            )
+            import pdb
+
+            pdb.set_trace()
+            raise e
+
+        lookup.reverse()
+
         # this isn't confirming there isn't shadowing-
         # import os
         # def foon(os): ... # just got shadowed, 'os' in that ctx is not 
globals()['os']
         # it takes effort, and it's not worth it; this tool is already known 
loose.
-        if (target := self.current.ctx_imports.get(node.value.id, None)) is 
not None:
-            target.accesses.add(node.attr)
+
+        if (target := self.current.ctx_imports.get(lookup[0], None)) is None:
+            # it's an attribute, or an import we don't care about.
+            return
+        # build an absolute path, use resolve machinery to sort this.
+        parts = target.module.qualname.split(".") + lookup[1:]
+        parts, mod = self.root.resolve_import(".".join(parts), 
requester=self.current)
+        assert mod is not self.root
+
+        if parts:
+            # attribute access into that module.
+            mod.accessed_by[parts[0]].add(self.current)
 
 
 parser = argparse.ArgumentParser(
@@ -140,42 +258,83 @@ parser.add_argument(
 parser.add_argument(
     "targets", type=str, nargs="+", help="python namespaces to scan for usage."
 )
+parser.add_argument(
+    "-v", action="store_true", default=False, dest="verbose", help="Increase 
verbosity"
+)
 
 
 def main(options, out, err) -> int:
     root = ModuleImport(None, None, "")
+
+    source_modules: list[ModuleImport] = []
+    ast_sources = {}
+    # pre-initialize the module tree of what we care about.
     for target in tuple(options.targets) + (options.source,):
-        for mod in get_submodules_of(__import__(target), include_root=True):
-            p = cast(str, mod.__file__)
-            with Path(p).open() as f:
-                tree = ast.parse(f.read(), str(p))
-                ImportCollector(root, target).visit(tree)
+        for module in get_submodules_of(import_module(target), 
include_root=True):
+            obj = root.create(module.__name__.split("."))
+            obj.alls = getattr(module, "__all__", None)
+            p = Path(cast(str, module.__file__))
+            with p.open("r") as f:
+                ast_sources[obj] = (p, ast.parse(f.read(), str(p)))
+            if target == options.source:
+                source_modules.append(obj)
+
+    # collect and finalize imports, then run analysis based on attribute 
access.
+
+    # Note: the import collection may need to run multiple times.  Consider:
+    # klass.py:
+    # __all__ = ('blah', 'foon')
+    # from .other import blah, foon
+    #
+    # If some other module tries to travers klass.py before those from imports 
have been placed, the
+    # other module will think it stopped at an attribute for 'blah'.  Which 
isn't correct.
+    # They internally detect this conflict and mark a boolean to indicate if a 
reprocessing is needed.
+    must_be_processed = list(ast_sources)
+    for run in range(0, 10):
+        for mod in must_be_processed:
+            p, tree = ast_sources[mod]
+            ImportCollector(root, mod, mod.qualname, p).visit(tree)
+
+        if new_reprocess := [mod for mod in ast_sources if 
mod.requires_reprocessing]:
+            if len(new_reprocess) == len(must_be_processed):
+                raise Exception("cycle encountered")
+            must_be_processed = new_reprocess
+        else:
+            break
+
+    for mod, (p, tree) in ast_sources.items():
+        AttributeCollector(root, mod).visit(tree)
 
-    source_modules = list(get_submodules_of(__import__(options.source)))
     results = []
     for mod in source_modules:
-        results.append(result := [mod.__name__])
-        if (mod_alls := getattr(mod, "__all__", None)) is None:
-            result.append(f"{mod.__name__} has no __all__.  Not analyzing")
+        results.append(result := [mod.qualname])
+        if mod.alls is None:
+            result.append(f"{mod.qualname} has no __all__.  Not analyzing")
             continue
-        collected = root.resolve_import(mod.__name__)
-        missing = list(sorted(set(mod_alls).difference(collected.accesses)))
+        if options.verbose:
+            result.append("__all__ = (" + ", ".join(sorted(mod.alls)) + ")")
+
+        missing = list(sorted(set(mod.alls).difference(mod.accessed_by)))
         if not missing:
             continue
-        result.append(f"all is {list(sorted(mod_alls))}")
-        if collected.unscoped_access:
+        # result.append(f"all is {list(sorted(mod.alls))}")
+        if mod.unscoped_accessers:
             result.append(
-                f"unscoped access exists from {collected.unscoped_access!r}.  
getattr() type isn't detectable current, results may be wrong"
+                f"unscoped access exists from {mod.unscoped_accessers!r}.  
Results may be inaccurate"
             )
 
         result.append(f"possibly unused {missing}")
 
     first = ""
     for block in sorted(results, key=lambda l: l[0]):
-        if len(block) == 1:
+        if len(block) == 1 and not options.verbose:
             continue
         out.write(f"{first}{block[0]}\n")
         first = "\n"
+        if len(block) == 1:
+            out.write("  __all__ is fully used\n")
+            continue
+
         for lines in block[1:]:
             out.write(f"  {lines}\n")
 

Reply via email to