commit:     b1d90aef11b191a22184895021068b56b7c0e0ad
Author:     Brian Harring <ferringb <AT> gmail <DOT> com>
AuthorDate: Fri Dec  5 19:55:13 2025 +0000
Commit:     Brian Harring <ferringb <AT> gmail <DOT> com>
CommitDate: Fri Dec  5 20:48:21 2025 +0000
URL:        
https://gitweb.gentoo.org/proj/pkgcore/snakeoil.git/commit/?id=b1d90aef

chore(sequences): annotate and deprecate

Also rewire implementations into the underlying base
implementation where possible; modern hardware and python
is *drastically* faster now where the trade off of an extra
frame is a no-burger.

Signed-off-by: Brian Harring <ferringb <AT> gmail.com>

 src/snakeoil/sequences.py | 145 ++++++++++++++++++++++++++++++++++------------
 tests/test_sequences.py   |  20 ++++++-
 2 files changed, 128 insertions(+), 37 deletions(-)

diff --git a/src/snakeoil/sequences.py b/src/snakeoil/sequences.py
index ba64d84..3d82a85 100644
--- a/src/snakeoil/sequences.py
+++ b/src/snakeoil/sequences.py
@@ -11,11 +11,30 @@ __all__ = (
     "split_elements",
 )
 
-from typing import Any, Callable, Iterable, Type
+from typing import (
+    Callable,
+    Generic,
+    Hashable,
+    Iterable,
+    Iterator,
+    Literal,
+    NamedTuple,
+    TypeAlias,
+    TypeVar,
+    overload,
+)
+
+from snakeoil.deprecation import deprecated, suppress_deprecation_warning
 
 from .iterables import expandable_chain
 
+T = TypeVar("T")
+H = TypeVar("H", bound=Hashable)
 
+
+@deprecated(
+    """snakeoil.klass.unstable_unique is deprecated.  Use set() instead, it 
will have superior performance characteristics albeit will allocate more than 
this implementationd which sorted the sequence"""
+)
 def unstable_unique(sequence):
     """Given a sequence, return a list of the unique items without preserving 
ordering."""
 
@@ -24,7 +43,8 @@ def unstable_unique(sequence):
     except TypeError:
         # if it doesn't support len, assume it's an iterable
         # and fallback to the slower stable_unique
-        return stable_unique(sequence)
+        with suppress_deprecation_warning():
+            return stable_unique(sequence)
     # assume all elements are hashable, if so, it's linear
     try:
         return list(set(sequence))
@@ -55,23 +75,30 @@ def unstable_unique(sequence):
     return u
 
 
-def stable_unique(iterable):
+@deprecated(
+    "snakeoil.sequence.stable_unique is deprecated.  Use 
snakeoil.sequence.unique_stable but be aware it now requires all items be 
hashable"
+)
+def stable_unique(iterable: Iterable[T]) -> list[T]:
     """Given a sequence, return a list of the unique items while preserving 
ordering.
 
     For performance reasons, only use this if you really do need to preserve
     the ordering.
     """
-    return list(iter_stable_unique(iterable))
+    with suppress_deprecation_warning():
+        return list(iter_stable_unique(iterable))
 
 
-def iter_stable_unique(iterable):
+@deprecated(
+    "snakeoil.sequence.iter_stable_unique is deprecated.  Use 
snakeoil.sequence.unique_stable but be aware it now requires all items be 
hashable"
+)
+def iter_stable_unique(iterable: Iterable[T]) -> Iterator[T]:
     """Given a sequence, yield unique items while preserving ordering.
 
     For performance reasons, only use this if you really do need to preserve
     the ordering.
     """
-    seen = set()
-    unhashable_seen = []
+    seen: set[T] = set()
+    unhashable_seen: list[T] = []
     iterable = iter(iterable)
     # the reason for this structuring is purely speed- entering try/except
     # repeatedly is costly, thus structure it to penalize the unhashables
@@ -79,7 +106,7 @@ def iter_stable_unique(iterable):
     singleton = object()
 
     while True:
-        x = singleton
+        x: T = singleton  # pyright: ignore[reportAssignmentType]
         try:
             for x in iterable:
                 if x not in seen:
@@ -97,34 +124,46 @@ def iter_stable_unique(iterable):
         break
 
 
+def unique_stable(iterable: Iterable[H]) -> Iterator[H]:
+    """Given an iterator, normalize it yielding items in stable ordering, 
removing duplicates"""
+    s: set[H] = set()
+    for thing in iterable:
+        if thing not in s:
+            yield thing
+            s.add(thing)
+
+
+# in py3.11 it's impossible to represent this recursive typing of iterable, 
thus
+# the typing is wrong.  When py3.12 is the min, change T_recursive to this.
+# T_recursive = TypeAliasType("T_recursive", Iterable["T_recursive[T]"] | T, 
type_params=(T,))
+T_recursive: TypeAlias = Iterable[T]
+
+
 def iflatten_instance(
-    l: Iterable, skip_flattening: Iterable[Type] = (str, bytes)
-) -> Iterable:
+    iterable: T_recursive, skip_flattening: tuple[type, ...] | type = (str, 
bytes), /
+) -> Iterable[T | str | bytes]:
     """collapse [[1],2] into [1,2]
 
     :param skip_flattening: list of classes to not descend through
     :return: this generator yields each item that cannot be flattened (or is
         skipped due to being a instance of ``skip_flattening``)
     """
-    if isinstance(l, skip_flattening):
-        yield l
-        return
-    iters = expandable_chain(l)
-    try:
-        while True:
-            x = next(iters)
-            if hasattr(x, "__iter__") and not (
-                isinstance(x, skip_flattening)
-                or (isinstance(x, (str, bytes)) and len(x) == 1)
-            ):
-                iters.appendleft(x)
-            else:
-                yield x
-    except StopIteration:
-        pass
 
+    def f(x):
+        # Yes, that logic is weird.  It's historical compatibility that should 
be ripped out,
+        # but exists as a safe  guard since bytes and str being iterable 
screws up a lot of python
+        # code.
+        return isinstance(x, skip_flattening) or (
+            isinstance(x, (bytes, str)) and len(x) == 1
+        )
+
+    return iflatten_func(iterable, f)
 
-def iflatten_func(l: Iterable, skip_func: Callable[[Any], bool]) -> Iterable:
+
+# Like iflatten instance, this is impossible to properly type in 3.11
+def iflatten_func(
+    iterable: T_recursive | T, skip_func: Callable[[T], bool], /
+) -> Iterable[T]:
     """collapse [[1],2] into [1,2]
 
     :param skip_func: a callable that returns True when iflatten_func should
@@ -132,10 +171,10 @@ def iflatten_func(l: Iterable, skip_func: Callable[[Any], 
bool]) -> Iterable:
     :return: this generator yields each item that cannot be flattened (or is
         skipped due to a True result from skip_func)
     """
-    if skip_func(l):
-        yield l
+    if skip_func(iterable):  # pyright: ignore[reportArgumentType]
+        yield iterable
         return
-    iters = expandable_chain(l)
+    iters = expandable_chain[T](iterable)  # pyright: 
ignore[reportArgumentType]
     try:
         while True:
             x = next(iters)
@@ -147,7 +186,25 @@ def iflatten_func(l: Iterable, skip_func: Callable[[Any], 
bool]) -> Iterable:
         pass
 
 
-def predicate_split(func, stream, key=None):
+T2 = TypeVar("T2")
+
+
+@overload
+def predicate_split(
+    func: Callable[[T], bool], stream: Iterable[T], /, key: Literal[None] = 
None
+): ...
+
+
+@overload
+def predicate_split(
+    func: Callable[[T2], bool],
+    stream: Iterable[T],
+    /,
+    key: Callable[[T], T2] | None = None,
+): ...
+
+
+def predicate_split(func, stream, /, key=None) -> tuple[list[T], list[T]]:
     """
     Given a stream and a function, split the stream into two sequences based on
     the results of the func for that item
@@ -166,6 +223,7 @@ def predicate_split(func, stream, key=None):
     >>> assert odd == [1, 3, 5, 7, 9]
     >>> assert even == [0, 2, 4, 6, 8]
     """
+    stream = iter(stream)
     true_l, false_l = [], []
     tappend, fappend = true_l.append, false_l.append
     # needs to be fast... this this since a simple
@@ -185,8 +243,15 @@ def predicate_split(func, stream, key=None):
     return false_l, true_l
 
 
-def split_negations(iterable, func=str):
-    """ "Split an iterable into negative and positive elements.
+class BoolSplitResults(Generic[T], NamedTuple):
+    negative: tuple[T]
+    positive: tuple[T]
+
+
+def split_negations(
+    iterable: Iterable[str], func: Callable[[str], T] = str
+) -> BoolSplitResults[T]:
+    """Split an iterable into negative and positive elements.
 
     :param iterable: iterable targeted for splitting
     :param func: wrapper method to modify tokens
@@ -205,10 +270,18 @@ def split_negations(iterable, func=str):
         obj = func(token)
         if obj is not None:
             l.append(obj)
-    return tuple(neg), tuple(pos)
+    return BoolSplitResults[T](tuple(neg), tuple(pos))
+
+
+class BoolTernaryResults(Generic[T], NamedTuple):
+    negative: tuple[T]
+    neutral: tuple[T]
+    positive: tuple[T]
 
 
-def split_elements(iterable, func=str):
+def split_elements(
+    iterable: Iterable[str], func: Callable[[str], T] = str
+) -> BoolTernaryResults[T]:
     """ "Split an iterable into negative, neutral, and positive elements.
 
     :param iterable: iterable targeted for splitting
@@ -229,4 +302,4 @@ def split_elements(iterable, func=str):
         obj = func(token)
         if obj is not None:
             l.append(obj)
-    return tuple(neg), tuple(neu), tuple(pos)
+    return BoolTernaryResults[T](tuple(neg), tuple(neu), tuple(pos))

diff --git a/tests/test_sequences.py b/tests/test_sequences.py
index ab4a9fe..f9866ed 100644
--- a/tests/test_sequences.py
+++ b/tests/test_sequences.py
@@ -5,7 +5,12 @@ from operator import itemgetter
 import pytest
 
 from snakeoil import sequences
-from snakeoil.sequences import iter_stable_unique, split_elements, 
split_negations
+from snakeoil.sequences import (
+    iter_stable_unique,
+    split_elements,
+    split_negations,
+    unique_stable,
+)
 
 
 class UnhashableComplex(complex):
@@ -266,3 +271,16 @@ class TestSplitElements:
             tuple(range(100)),
             tuple(range(100)),
         )
+
+
+def test_unique_stable():
+    with pytest.raises(TypeError):
+        # verify our assumptions of object being unhashable
+        hash([])
+    with pytest.raises(TypeError):
+        list(unique_stable([[]]))  # pyright: ignore[reportArgumentType]
+
+    # force iter so it's not possible to infer from the type.
+    assert ["c", "a", "b"] == list(
+        unique_stable(iter(["c", "a", "a", "c", "a", "b", "b"]))
+    )

Reply via email to