Fokko commented on code in PR #1388:
URL: https://github.com/apache/iceberg-python/pull/1388#discussion_r1944741812


##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1732,230 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    """Finds the residuals for an Expression the partitions in the given 
PartitionSpec.
+
+    A residual expression is made by partially evaluating an expression using 
partition values.
+    For example, if a table is partitioned by day(utc_timestamp) and is read 
with a filter expression
+    utc_timestamp >= a and utc_timestamp <= b, then there are 4 possible 
residuals expressions
+    for the partition data, d:
+
+
+    1. If d > day(a) and d < day(b), the residual is always true
+    2. If d == day(a) and d != day(b), the residual is utc_timestamp >= a
+    3. if d == day(b) and d != day(a), the residual is utc_timestamp <= b
+    4. If d == day(a) == day(b), the residual is utc_timestamp >= a and 
utc_timestamp <= b
+
+    Partition data is passed using StructLike. Residuals are returned by 
residualFor(StructLike).
+
+    This class is thread-safe.
+    """
+
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+    expr: BooleanExpression
+
+    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression) -> None:
+        self.schema = schema
+        self.spec = spec
+        self.case_sensitive = case_sensitive
+        self.expr = expr
+
+    def eval(self, partition_data: Record) -> BooleanExpression:
+        self.struct = partition_data
+        return visit(self.expr, visitor=self)
+
+    def visit_true(self) -> BooleanExpression:
+        return AlwaysTrue()
+
+    def visit_false(self) -> BooleanExpression:
+        return AlwaysFalse()
+
+    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+        return Not(child_result)
+
+    def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return And(left_result, right_result)
+
+    def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return Or(left_result, right_result)
+
+    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is not None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if isinstance(val, SupportsFloat) and math.isnan(val):
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if isinstance(val, SupportsFloat) and not math.isnan(val):
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) < literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) <= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) > literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) >= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) == literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) != literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) not in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        eval_res = term.eval(self.struct)
+        if eval_res is not None and 
str(eval_res).startswith(str(literal.value)):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) 
-> BooleanExpression:
+        if not self.visit_starts_with(term, literal):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> 
BooleanExpression:
+        """
+        If there is no strict projection or if it evaluates to false, then 
return the predicate.
+
+        Get the strict projection and inclusive projection of this predicate 
in partition data,
+        then use them to determine whether to return the original predicate. 
The strict projection
+        returns true iff the original predicate would have returned true, so 
the predicate can be
+        eliminated if the strict projection evaluates to true. Similarly the 
inclusive projection
+        returns false iff the original predicate would have returned false, so 
the predicate can
+        also be eliminated if the inclusive projection evaluates to false.
+
+        """
+        parts = 
self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
+        if parts == []:
+            return predicate
+
+        def struct_to_schema(struct: StructType) -> Schema:
+            return Schema(*struct.fields)
+
+        for part in parts:
+            strict_projection = part.transform.strict_project(part.name, 
predicate)
+            strict_result = None
+
+            if strict_projection is not None:
+                bound = 
strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
+                if isinstance(bound, BoundPredicate):
+                    strict_result = super().visit_bound_predicate(bound)
+                else:
+                    # if the result is not a predicate, then it must be a 
constant like alwaysTrue or alwaysFalse
+                    strict_result = bound
+
+            if strict_result is not None and isinstance(strict_result, 
AlwaysTrue):
+                return AlwaysTrue()
+
+            inclusive_projection = part.transform.project(part.name, predicate)
+            inclusive_result = None
+            if inclusive_projection is not None:
+                bound_inclusive = 
inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))

Review Comment:
   ```suggestion
                   bound_inclusive = 
inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)),
 case_sensitive=self.case_sensitive)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to