ForeverAngry commented on code in PR #2695:
URL: https://github.com/apache/iceberg-python/pull/2695#discussion_r2591081142
##########
pyiceberg/expressions/visitors.py:
##########
@@ -1975,11 +1977,119 @@ def residual_for(self, partition_data: Record) ->
BooleanExpression:
return self.expr
+_DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE = 128
+
+
+class ResidualEvaluatorCache:
+ """Thread-safe LRU cache for ResidualEvaluator instances.
+
+ Caches ResidualEvaluators to avoid repeated instantiation and
initialization
+ overhead when scanning multiple data files with identical partition specs,
+ expressions, schemas, and case sensitivity settings.
+ """
+
+ _cache: Dict[str, ResidualEvaluator]
+ _maxsize: int
+ _lock: threading.RLock
+
+ def __init__(self, maxsize: int = _DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE)
-> None:
+ """Initialize the cache.
+
+ Args:
+ maxsize: Maximum number of evaluators to cache. Defaults to 128.
+ """
+ self._cache = {}
+ self._maxsize = maxsize
+ self._lock = threading.RLock()
+
+ @staticmethod
+ def _make_key(
+ spec_id: int,
+ expr: BooleanExpression,
+ case_sensitive: bool,
+ schema_id: int | None = None,
+ ) -> str:
+ """Create deterministic cache key from evaluator parameters.
+
+ Args:
+ spec_id: Partition spec identifier.
+ expr: Filter expression tree.
+ case_sensitive: Case-sensitive flag.
+ schema_id: Optional schema identifier.
+
+ Returns:
+ 32-character MD5 hex string cache key.
+ """
+ key_parts = f"{spec_id}#{repr(expr)}#{case_sensitive}#{schema_id}"
+ return hashlib.md5(key_parts.encode()).hexdigest()
+
+ def get(
+ self,
+ spec: PartitionSpec,
+ expr: BooleanExpression,
+ case_sensitive: bool,
+ schema: Schema,
+ ) -> ResidualEvaluator | None:
+ """Retrieve cached evaluator if it exists.
+
+ Args:
+ spec: Partition specification.
+ expr: Filter expression.
+ case_sensitive: Case sensitivity flag.
+ schema: Table schema.
+
+ Returns:
+ Cached ResidualEvaluator or None.
+ """
+ cache_key = self._make_key(spec.spec_id, expr, case_sensitive,
schema.schema_id)
+ with self._lock:
+ return self._cache.get(cache_key)
+
+ def put(
+ self,
+ spec: PartitionSpec,
+ expr: BooleanExpression,
+ case_sensitive: bool,
+ schema: Schema,
+ evaluator: ResidualEvaluator,
+ ) -> None:
+ """Cache a ResidualEvaluator instance.
+
+ Args:
+ spec: Partition specification.
+ expr: Filter expression.
+ case_sensitive: Case sensitivity flag.
+ schema: Table schema.
+ evaluator: ResidualEvaluator to cache.
+ """
+ cache_key = self._make_key(spec.spec_id, expr, case_sensitive,
schema.schema_id)
+ with self._lock:
+ if len(self._cache) >= self._maxsize:
+ oldest_key = next(iter(self._cache))
+ del self._cache[oldest_key]
+ self._cache[cache_key] = evaluator
+
+ def clear(self) -> None:
+ """Clear all cached evaluators."""
+ with self._lock:
+ self._cache.clear()
+
+
+_residual_evaluator_cache = ResidualEvaluatorCache()
+
+
def residual_evaluator_of(
spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool,
schema: Schema
) -> ResidualEvaluator:
- return (
- UnpartitionedResidualEvaluator(schema=schema, expr=expr)
- if spec.is_unpartitioned()
- else ResidualEvaluator(spec=spec, expr=expr, schema=schema,
case_sensitive=case_sensitive)
- )
+ cached = _residual_evaluator_cache.get(spec, expr, case_sensitive, schema)
+ if cached is not None:
+ return cached
Review Comment:
@jayceslesar thanks for the the review!! Glad your back in the saddle!
@Fokko what do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]