commit: b1d90aef11b191a22184895021068b56b7c0e0ad
Author: Brian Harring <ferringb <AT> gmail <DOT> com>
AuthorDate: Fri Dec 5 19:55:13 2025 +0000
Commit: Brian Harring <ferringb <AT> gmail <DOT> com>
CommitDate: Fri Dec 5 20:48:21 2025 +0000
URL:
https://gitweb.gentoo.org/proj/pkgcore/snakeoil.git/commit/?id=b1d90aef
chore(sequences): annotate and deprecate
Also rewire implementations into the underlying base
implementation where possible; modern hardware and python
is *drastically* faster now where the trade off of an extra
frame is a no-burger.
Signed-off-by: Brian Harring <ferringb <AT> gmail.com>
src/snakeoil/sequences.py | 145 ++++++++++++++++++++++++++++++++++------------
tests/test_sequences.py | 20 ++++++-
2 files changed, 128 insertions(+), 37 deletions(-)
diff --git a/src/snakeoil/sequences.py b/src/snakeoil/sequences.py
index ba64d84..3d82a85 100644
--- a/src/snakeoil/sequences.py
+++ b/src/snakeoil/sequences.py
@@ -11,11 +11,30 @@ __all__ = (
"split_elements",
)
-from typing import Any, Callable, Iterable, Type
+from typing import (
+ Callable,
+ Generic,
+ Hashable,
+ Iterable,
+ Iterator,
+ Literal,
+ NamedTuple,
+ TypeAlias,
+ TypeVar,
+ overload,
+)
+
+from snakeoil.deprecation import deprecated, suppress_deprecation_warning
from .iterables import expandable_chain
+T = TypeVar("T")
+H = TypeVar("H", bound=Hashable)
+
+@deprecated(
+ """snakeoil.klass.unstable_unique is deprecated. Use set() instead, it
will have superior performance characteristics albeit will allocate more than
this implementationd which sorted the sequence"""
+)
def unstable_unique(sequence):
"""Given a sequence, return a list of the unique items without preserving
ordering."""
@@ -24,7 +43,8 @@ def unstable_unique(sequence):
except TypeError:
# if it doesn't support len, assume it's an iterable
# and fallback to the slower stable_unique
- return stable_unique(sequence)
+ with suppress_deprecation_warning():
+ return stable_unique(sequence)
# assume all elements are hashable, if so, it's linear
try:
return list(set(sequence))
@@ -55,23 +75,30 @@ def unstable_unique(sequence):
return u
-def stable_unique(iterable):
+@deprecated(
+ "snakeoil.sequence.stable_unique is deprecated. Use
snakeoil.sequence.unique_stable but be aware it now requires all items be
hashable"
+)
+def stable_unique(iterable: Iterable[T]) -> list[T]:
"""Given a sequence, return a list of the unique items while preserving
ordering.
For performance reasons, only use this if you really do need to preserve
the ordering.
"""
- return list(iter_stable_unique(iterable))
+ with suppress_deprecation_warning():
+ return list(iter_stable_unique(iterable))
-def iter_stable_unique(iterable):
+@deprecated(
+ "snakeoil.sequence.iter_stable_unique is deprecated. Use
snakeoil.sequence.unique_stable but be aware it now requires all items be
hashable"
+)
+def iter_stable_unique(iterable: Iterable[T]) -> Iterator[T]:
"""Given a sequence, yield unique items while preserving ordering.
For performance reasons, only use this if you really do need to preserve
the ordering.
"""
- seen = set()
- unhashable_seen = []
+ seen: set[T] = set()
+ unhashable_seen: list[T] = []
iterable = iter(iterable)
# the reason for this structuring is purely speed- entering try/except
# repeatedly is costly, thus structure it to penalize the unhashables
@@ -79,7 +106,7 @@ def iter_stable_unique(iterable):
singleton = object()
while True:
- x = singleton
+ x: T = singleton # pyright: ignore[reportAssignmentType]
try:
for x in iterable:
if x not in seen:
@@ -97,34 +124,46 @@ def iter_stable_unique(iterable):
break
+def unique_stable(iterable: Iterable[H]) -> Iterator[H]:
+ """Given an iterator, normalize it yielding items in stable ordering,
removing duplicates"""
+ s: set[H] = set()
+ for thing in iterable:
+ if thing not in s:
+ yield thing
+ s.add(thing)
+
+
+# in py3.11 it's impossible to represent this recursive typing of iterable,
thus
+# the typing is wrong. When py3.12 is the min, change T_recursive to this.
+# T_recursive = TypeAliasType("T_recursive", Iterable["T_recursive[T]"] | T,
type_params=(T,))
+T_recursive: TypeAlias = Iterable[T]
+
+
def iflatten_instance(
- l: Iterable, skip_flattening: Iterable[Type] = (str, bytes)
-) -> Iterable:
+ iterable: T_recursive, skip_flattening: tuple[type, ...] | type = (str,
bytes), /
+) -> Iterable[T | str | bytes]:
"""collapse [[1],2] into [1,2]
:param skip_flattening: list of classes to not descend through
:return: this generator yields each item that cannot be flattened (or is
skipped due to being a instance of ``skip_flattening``)
"""
- if isinstance(l, skip_flattening):
- yield l
- return
- iters = expandable_chain(l)
- try:
- while True:
- x = next(iters)
- if hasattr(x, "__iter__") and not (
- isinstance(x, skip_flattening)
- or (isinstance(x, (str, bytes)) and len(x) == 1)
- ):
- iters.appendleft(x)
- else:
- yield x
- except StopIteration:
- pass
+ def f(x):
+ # Yes, that logic is weird. It's historical compatibility that should
be ripped out,
+ # but exists as a safe guard since bytes and str being iterable
screws up a lot of python
+ # code.
+ return isinstance(x, skip_flattening) or (
+ isinstance(x, (bytes, str)) and len(x) == 1
+ )
+
+ return iflatten_func(iterable, f)
-def iflatten_func(l: Iterable, skip_func: Callable[[Any], bool]) -> Iterable:
+
+# Like iflatten instance, this is impossible to properly type in 3.11
+def iflatten_func(
+ iterable: T_recursive | T, skip_func: Callable[[T], bool], /
+) -> Iterable[T]:
"""collapse [[1],2] into [1,2]
:param skip_func: a callable that returns True when iflatten_func should
@@ -132,10 +171,10 @@ def iflatten_func(l: Iterable, skip_func: Callable[[Any],
bool]) -> Iterable:
:return: this generator yields each item that cannot be flattened (or is
skipped due to a True result from skip_func)
"""
- if skip_func(l):
- yield l
+ if skip_func(iterable): # pyright: ignore[reportArgumentType]
+ yield iterable
return
- iters = expandable_chain(l)
+ iters = expandable_chain[T](iterable) # pyright:
ignore[reportArgumentType]
try:
while True:
x = next(iters)
@@ -147,7 +186,25 @@ def iflatten_func(l: Iterable, skip_func: Callable[[Any],
bool]) -> Iterable:
pass
-def predicate_split(func, stream, key=None):
+T2 = TypeVar("T2")
+
+
+@overload
+def predicate_split(
+ func: Callable[[T], bool], stream: Iterable[T], /, key: Literal[None] =
None
+): ...
+
+
+@overload
+def predicate_split(
+ func: Callable[[T2], bool],
+ stream: Iterable[T],
+ /,
+ key: Callable[[T], T2] | None = None,
+): ...
+
+
+def predicate_split(func, stream, /, key=None) -> tuple[list[T], list[T]]:
"""
Given a stream and a function, split the stream into two sequences based on
the results of the func for that item
@@ -166,6 +223,7 @@ def predicate_split(func, stream, key=None):
>>> assert odd == [1, 3, 5, 7, 9]
>>> assert even == [0, 2, 4, 6, 8]
"""
+ stream = iter(stream)
true_l, false_l = [], []
tappend, fappend = true_l.append, false_l.append
# needs to be fast... this this since a simple
@@ -185,8 +243,15 @@ def predicate_split(func, stream, key=None):
return false_l, true_l
-def split_negations(iterable, func=str):
- """ "Split an iterable into negative and positive elements.
+class BoolSplitResults(Generic[T], NamedTuple):
+ negative: tuple[T]
+ positive: tuple[T]
+
+
+def split_negations(
+ iterable: Iterable[str], func: Callable[[str], T] = str
+) -> BoolSplitResults[T]:
+ """Split an iterable into negative and positive elements.
:param iterable: iterable targeted for splitting
:param func: wrapper method to modify tokens
@@ -205,10 +270,18 @@ def split_negations(iterable, func=str):
obj = func(token)
if obj is not None:
l.append(obj)
- return tuple(neg), tuple(pos)
+ return BoolSplitResults[T](tuple(neg), tuple(pos))
+
+
+class BoolTernaryResults(Generic[T], NamedTuple):
+ negative: tuple[T]
+ neutral: tuple[T]
+ positive: tuple[T]
-def split_elements(iterable, func=str):
+def split_elements(
+ iterable: Iterable[str], func: Callable[[str], T] = str
+) -> BoolTernaryResults[T]:
""" "Split an iterable into negative, neutral, and positive elements.
:param iterable: iterable targeted for splitting
@@ -229,4 +302,4 @@ def split_elements(iterable, func=str):
obj = func(token)
if obj is not None:
l.append(obj)
- return tuple(neg), tuple(neu), tuple(pos)
+ return BoolTernaryResults[T](tuple(neg), tuple(neu), tuple(pos))
diff --git a/tests/test_sequences.py b/tests/test_sequences.py
index ab4a9fe..f9866ed 100644
--- a/tests/test_sequences.py
+++ b/tests/test_sequences.py
@@ -5,7 +5,12 @@ from operator import itemgetter
import pytest
from snakeoil import sequences
-from snakeoil.sequences import iter_stable_unique, split_elements,
split_negations
+from snakeoil.sequences import (
+ iter_stable_unique,
+ split_elements,
+ split_negations,
+ unique_stable,
+)
class UnhashableComplex(complex):
@@ -266,3 +271,16 @@ class TestSplitElements:
tuple(range(100)),
tuple(range(100)),
)
+
+
+def test_unique_stable():
+ with pytest.raises(TypeError):
+ # verify our assumptions of object being unhashable
+ hash([])
+ with pytest.raises(TypeError):
+ list(unique_stable([[]])) # pyright: ignore[reportArgumentType]
+
+ # force iter so it's not possible to infer from the type.
+ assert ["c", "a", "b"] == list(
+ unique_stable(iter(["c", "a", "a", "c", "a", "b", "b"]))
+ )