Fokko commented on code in PR #1388:
URL: https://github.com/apache/iceberg-python/pull/1388#discussion_r1913339866


##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+
+    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression):
+        self.schema = schema
+        self.spec = spec
+        self.case_sensitive = case_sensitive
+        self.expr = expr
+
+    def eval(self, partition_data: Record) -> BooleanExpression:
+        self.struct = partition_data
+        return visit(self.expr, visitor=self)
+
+    def visit_true(self) -> BooleanExpression:
+        return AlwaysTrue()
+
+    def visit_false(self) -> BooleanExpression:
+        return AlwaysFalse()
+
+    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+        return Not(child_result)
+
+    def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return And(left_result, right_result)
+
+    def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return Or(left_result, right_result)
+
+    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is not None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is None:
+            return self.visit_true()

Review Comment:
   Similar to Java, I think we can return `AlwaysTrue` directly, instead of 
calling `visit_true()`: 
https://github.com/apache/iceberg/blob/5fd16b5bfeb85e12b5a9ecb4e39504389d7b72ed/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java#L157
   
   
   ```suggestion
               return AlwaysTrue()
   ```



##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):

Review Comment:
   Can we copy this comment?
   
   
https://github.com/apache/iceberg/blob/5fd16b5bfeb85e12b5a9ecb4e39504389d7b72ed/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java#L32
   
   I think that would be helpful for the reader.



##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+
+    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression):
+        self.schema = schema
+        self.spec = spec
+        self.case_sensitive = case_sensitive
+        self.expr = expr
+
+    def eval(self, partition_data: Record) -> BooleanExpression:
+        self.struct = partition_data
+        return visit(self.expr, visitor=self)
+
+    def visit_true(self) -> BooleanExpression:
+        return AlwaysTrue()
+
+    def visit_false(self) -> BooleanExpression:
+        return AlwaysFalse()
+
+    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+        return Not(child_result)
+
+    def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return And(left_result, right_result)
+
+    def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return Or(left_result, right_result)
+
+    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is not None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is not None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) < literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) <= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) > literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) >= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) == literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) != literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) not in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        eval_res = term.eval(self.struct)
+        if eval_res is not None and 
str(eval_res).startswith(str(literal.value)):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) 
-> BooleanExpression:
+        if not self.visit_starts_with(term, literal):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> 
BooleanExpression:
+        """
+        If there is no strict projection or if it evaluates to false, then 
return the predicate.
+
+        Get the strict projection and inclusive projection of this predicate 
in partition data,
+        then use them to determine whether to return the original predicate. 
The strict projection
+        returns true iff the original predicate would have returned true, so 
the predicate can be
+        eliminated if the strict projection evaluates to true. Similarly the 
inclusive projection
+        returns false iff the original predicate would have returned false, so 
the predicate can
+        also be eliminated if the inclusive projection evaluates to false.
+
+        """
+        parts = 
self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
+        if parts == []:
+            return predicate
+
+        from pyiceberg.types import StructType

Review Comment:
   Let's move this import to the top



##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+
+    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression):
+        self.schema = schema
+        self.spec = spec
+        self.case_sensitive = case_sensitive
+        self.expr = expr
+
+    def eval(self, partition_data: Record) -> BooleanExpression:
+        self.struct = partition_data
+        return visit(self.expr, visitor=self)
+
+    def visit_true(self) -> BooleanExpression:
+        return AlwaysTrue()
+
+    def visit_false(self) -> BooleanExpression:
+        return AlwaysFalse()
+
+    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+        return Not(child_result)
+
+    def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return And(left_result, right_result)
+
+    def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return Or(left_result, right_result)
+
+    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is not None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is not None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) < literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) <= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) > literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) >= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) == literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) != literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) not in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        eval_res = term.eval(self.struct)
+        if eval_res is not None and 
str(eval_res).startswith(str(literal.value)):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) 
-> BooleanExpression:
+        if not self.visit_starts_with(term, literal):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> 
BooleanExpression:
+        """
+        If there is no strict projection or if it evaluates to false, then 
return the predicate.
+
+        Get the strict projection and inclusive projection of this predicate 
in partition data,
+        then use them to determine whether to return the original predicate. 
The strict projection
+        returns true iff the original predicate would have returned true, so 
the predicate can be
+        eliminated if the strict projection evaluates to true. Similarly the 
inclusive projection
+        returns false iff the original predicate would have returned false, so 
the predicate can
+        also be eliminated if the inclusive projection evaluates to false.
+
+        """
+        parts = 
self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
+        if parts == []:
+            return predicate
+
+        from pyiceberg.types import StructType
+
+        def struct_to_schema(struct: StructType) -> Schema:
+            return Schema(*list(struct.fields))
+
+        for part in parts:
+            strict_projection = part.transform.strict_project(part.name, 
predicate)
+            strict_result = None
+
+            if strict_projection is not None:
+                bound = 
strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
+                if isinstance(bound, BoundPredicate):
+                    strict_result = super().visit_bound_predicate(bound)
+                else:
+                    strict_result = bound
+
+            if strict_result is not None and isinstance(strict_result, 
AlwaysTrue):
+                return AlwaysTrue()
+
+            inclusive_projection = part.transform.project(part.name, predicate)
+            inclusive_result = None
+            if inclusive_projection is not None:
+                bound_inclusive = 
inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
+                if isinstance(bound_inclusive, BoundPredicate):
+                    # using predicate method specific to inclusive
+                    inclusive_result = 
super().visit_bound_predicate(bound_inclusive)
+                else:
+                    # if the result is not a predicate, then it must be a 
constant like alwaysTrue or
+                    # alwaysFalse
+                    inclusive_result = bound_inclusive
+            if inclusive_result is not None and isinstance(inclusive_result, 
AlwaysFalse):
+                return AlwaysFalse()
+
+        return predicate
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> 
BooleanExpression:
+        bound = predicate.bind(self.schema, case_sensitive=True)
+
+        if isinstance(bound, BoundPredicate):
+            bound_residual = self.visit_bound_predicate(predicate=bound)
+            # if isinstance(bound_residual, BooleanExpression):
+            if bound_residual not in (AlwaysFalse(), AlwaysTrue()):
+                # replace inclusive original unbound predicate
+                return predicate
+
+            # use the non-predicate residual (e.g. alwaysTrue)
+            return bound_residual
+
+        # if binding didn't result in a Predicate, return the expression
+        return bound
+
+
+class ResidualEvaluator(ResidualVisitor):
+    def residual_for(self, partition_data: Record) -> BooleanExpression:
+        return self.eval(partition_data)
+
+
+class UnpartitionedResidualEvaluator(ResidualEvaluator):
+    # Finds the residuals for an Expression the partitions in the given 
PartitionSpec
+    def __init__(self, schema: Schema, expr: BooleanExpression):
+        from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC

Review Comment:
   Let's move this import to the top as well 👍 



##########
pyiceberg/table/__init__.py:
##########
@@ -1522,8 +1555,9 @@ def plan_files(self) -> Iterable[FileScanTask]:
                     data_entry,
                     positional_delete_entries,
                 ),
+                residual=residual,

Review Comment:
   Instead of carrying the residuals all the way through, I think we can 
compute them at this point instead:
   ```suggestion
                   
residual=residual_evaluators[manifest.partition_spec_id](data_entry.data_file.partition),
   ```
   If a whole manifest is rejected, we don't even compute the evaluator itself.



##########
pyiceberg/table/__init__.py:
##########
@@ -1596,6 +1630,43 @@ def to_ray(self) -> ray.data.dataset.Dataset:
 
         return ray.data.from_arrow(self.to_arrow())
 
+    def count(self) -> int:
+        # Usage: Calculates the total number of records in a Scan that haven't 
had positional deletes.
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
+
+        for task in tasks:
+            # task.residual is a Boolean Expression if the filter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and not len(task.delete_files):
+                # Every File has a metadata stat that stores the file record 
count
+                res += task.file.record_count
+            else:

Review Comment:
   I think the positional deletes are missing from the `else:` branch. How 
about re-using `_task_to_record_batches` for now?



##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+

Review Comment:
   Can we add:
   
   ```
   expr: BooleanExpression
   ```
   
   to the class variables as well?



##########
pyiceberg/table/__init__.py:
##########
@@ -1436,6 +1443,27 @@ def _build_partition_evaluator(self, spec_id: int) -> 
Callable[[DataFile], bool]
         # shared instance across multiple threads.
         return lambda data_file: expression_evaluator(partition_schema, 
partition_expr, self.case_sensitive)(data_file.partition)
 
+    from pyiceberg.expressions.visitors import ResidualEvaluator
+
+    def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], 
ResidualEvaluator]:
+        spec = self.table_metadata.specs()[spec_id]
+
+        # The lambda created here is run in multiple threads.
+        # So we avoid creating _EvaluatorExpression methods bound to a single
+        # shared instance across multiple threads.
+        # return lambda data_file: (partition_schema, partition_expr, 
self.case_sensitive)(data_file.partition)
+        from pyiceberg.expressions.visitors import residual_evaluator_of
+
+        # assert self.row_filter == False
+        return lambda datafile: (
+            residual_evaluator_of(
+                spec=spec,
+                expr=self.row_filter,
+                case_sensitive=self.case_sensitive,
+                schema=self.table_metadata.schema(),

Review Comment:
   In a separate PR we want to refactor this, where we use the same schema as 
in `projection()`



##########
pyiceberg/table/__init__.py:
##########
@@ -1343,33 +1346,37 @@ class FileScanTask(ScanTask):
     delete_files: Set[DataFile]
     start: int
     length: int
+    residual: BooleanExpression
 
     def __init__(
         self,
         data_file: DataFile,
         delete_files: Optional[Set[DataFile]] = None,
         start: Optional[int] = None,
         length: Optional[int] = None,
+        residual: Optional[BooleanExpression] = None,

Review Comment:
   I think we should make this one required. If there is no residual, it should 
default to `AlwaysTrue()`.



##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+
+    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression):
+        self.schema = schema
+        self.spec = spec
+        self.case_sensitive = case_sensitive
+        self.expr = expr
+
+    def eval(self, partition_data: Record) -> BooleanExpression:
+        self.struct = partition_data
+        return visit(self.expr, visitor=self)
+
+    def visit_true(self) -> BooleanExpression:
+        return AlwaysTrue()
+
+    def visit_false(self) -> BooleanExpression:
+        return AlwaysFalse()
+
+    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+        return Not(child_result)
+
+    def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return And(left_result, right_result)
+
+    def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return Or(left_result, right_result)
+
+    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is not None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is not None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) < literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) <= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) > literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) >= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) == literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) != literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) not in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        eval_res = term.eval(self.struct)
+        if eval_res is not None and 
str(eval_res).startswith(str(literal.value)):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) 
-> BooleanExpression:
+        if not self.visit_starts_with(term, literal):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> 
BooleanExpression:
+        """
+        If there is no strict projection or if it evaluates to false, then 
return the predicate.
+
+        Get the strict projection and inclusive projection of this predicate 
in partition data,
+        then use them to determine whether to return the original predicate. 
The strict projection
+        returns true iff the original predicate would have returned true, so 
the predicate can be
+        eliminated if the strict projection evaluates to true. Similarly the 
inclusive projection
+        returns false iff the original predicate would have returned false, so 
the predicate can
+        also be eliminated if the inclusive projection evaluates to false.
+
+        """
+        parts = 
self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
+        if parts == []:
+            return predicate
+
+        from pyiceberg.types import StructType
+
+        def struct_to_schema(struct: StructType) -> Schema:
+            return Schema(*list(struct.fields))
+
+        for part in parts:
+            strict_projection = part.transform.strict_project(part.name, 
predicate)
+            strict_result = None
+
+            if strict_projection is not None:
+                bound = 
strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
+                if isinstance(bound, BoundPredicate):
+                    strict_result = super().visit_bound_predicate(bound)
+                else:
+                    strict_result = bound

Review Comment:
   Let's keep the comments from Java in here, I think they are pretty helpful:
   ```suggestion
                       # if the result is not a predicate, then it must be a 
constant like alwaysTrue or alwaysFalse
                       strict_result = bound
   ```



##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+
+    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression):
+        self.schema = schema
+        self.spec = spec
+        self.case_sensitive = case_sensitive
+        self.expr = expr
+
+    def eval(self, partition_data: Record) -> BooleanExpression:
+        self.struct = partition_data
+        return visit(self.expr, visitor=self)
+
+    def visit_true(self) -> BooleanExpression:
+        return AlwaysTrue()
+
+    def visit_false(self) -> BooleanExpression:
+        return AlwaysFalse()
+
+    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+        return Not(child_result)
+
+    def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return And(left_result, right_result)
+
+    def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return Or(left_result, right_result)
+
+    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is not None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is not None:
+            return self.visit_true()
+        else:
+            return self.visit_false()

Review Comment:
   Java takes a different approach and checks for `NaN`:
   ```suggestion
           if isnan(term.eval(self.struct)):
               return self.visit_true()
           else:
               return self.visit_false()
   ```



##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+
+    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression):
+        self.schema = schema
+        self.spec = spec
+        self.case_sensitive = case_sensitive
+        self.expr = expr
+
+    def eval(self, partition_data: Record) -> BooleanExpression:
+        self.struct = partition_data
+        return visit(self.expr, visitor=self)
+
+    def visit_true(self) -> BooleanExpression:
+        return AlwaysTrue()
+
+    def visit_false(self) -> BooleanExpression:
+        return AlwaysFalse()
+
+    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+        return Not(child_result)
+
+    def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return And(left_result, right_result)
+
+    def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+        return Or(left_result, right_result)
+
+    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
+        if term.eval(self.struct) is not None:
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
+        val = term.eval(self.struct)
+        if val is not None:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) < literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) <= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) > literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> BooleanExpression:
+        if term.eval(self.struct) >= literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) == literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) != literal.value:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> 
BooleanExpression:
+        if term.eval(self.struct) not in literals:
+            return self.visit_true()
+        else:
+            return self.visit_false()
+
+    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> 
BooleanExpression:
+        eval_res = term.eval(self.struct)
+        if eval_res is not None and 
str(eval_res).startswith(str(literal.value)):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) 
-> BooleanExpression:
+        if not self.visit_starts_with(term, literal):
+            return AlwaysTrue()
+        else:
+            return AlwaysFalse()
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> 
BooleanExpression:
+        """
+        If there is no strict projection or if it evaluates to false, then 
return the predicate.
+
+        Get the strict projection and inclusive projection of this predicate 
in partition data,
+        then use them to determine whether to return the original predicate. 
The strict projection
+        returns true iff the original predicate would have returned true, so 
the predicate can be
+        eliminated if the strict projection evaluates to true. Similarly the 
inclusive projection
+        returns false iff the original predicate would have returned false, so 
the predicate can
+        also be eliminated if the inclusive projection evaluates to false.
+
+        """
+        parts = 
self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
+        if parts == []:
+            return predicate
+
+        from pyiceberg.types import StructType
+
+        def struct_to_schema(struct: StructType) -> Schema:
+            return Schema(*list(struct.fields))

Review Comment:
   The conversion to a list is not needed:
   
   ```suggestion
               return Schema(*struct.fields)
   ```
   
   ```
   python3
   Python 3.10.14 (main, Mar 19 2024, 21:46:16) [Clang 15.0.0 
(clang-1500.3.9.4)] on darwin
   Type "help", "copyright", "credits" or "license" for more information.
   >>> def vo(*int):
   ...     print(int)
   ... 
   >>> vo(*(1,2,3))
   (1, 2, 3)
   ```



##########
pyiceberg/expressions/visitors.py:
##########
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool:
 
     def _can_contain_nans(self, field_id: int) -> bool:
         return (nan_count := self.nan_counts.get(field_id)) is not None and 
nan_count > 0
+
+
+class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
+    schema: Schema
+    spec: PartitionSpec
+    case_sensitive: bool
+
+    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression):

Review Comment:
   ```suggestion
       def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: 
bool, expr: BooleanExpression) -> None:
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -1466,6 +1494,9 @@ def plan_files(self) -> Iterable[FileScanTask]:
         # the filter depends on the partition spec used to write the manifest 
file, so create a cache of filters for each spec id
 
         manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
+        from pyiceberg.expressions.visitors import ResidualEvaluator

Review Comment:
   I think we can move this import to the top as well



##########
pyiceberg/table/__init__.py:
##########
@@ -1596,6 +1630,43 @@ def to_ray(self) -> ray.data.dataset.Dataset:
 
         return ray.data.from_arrow(self.to_arrow())
 
+    def count(self) -> int:
+        # Usage: Calculates the total number of records in a Scan that haven't 
had positional deletes.
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
+
+        for task in tasks:
+            # task.residual is a Boolean Expression if the filter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and not len(task.delete_files):
+                # Every File has a metadata stat that stores the file record 
count
+                res += task.file.record_count
+            else:
+                from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+
+                arrow_scan = ArrowScan(
+                    table_metadata=self.table_metadata,
+                    io=self.io,
+                    projected_schema=self.projection(),

Review Comment:
   We are not interested in any fields at all, so we could just pass in an 
empty schema. This will also relax @gli-chris-hao his concern regarding memory 
pressure 👍 



##########
pyiceberg/table/__init__.py:
##########
@@ -1596,6 +1630,43 @@ def to_ray(self) -> ray.data.dataset.Dataset:
 
         return ray.data.from_arrow(self.to_arrow())
 
+    def count(self) -> int:
+        # Usage: Calculates the total number of records in a Scan that haven't 
had positional deletes.
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
+
+        for task in tasks:
+            # task.residual is a Boolean Expression if the filter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and not len(task.delete_files):
+                # Every File has a metadata stat that stores the file record 
count
+                res += task.file.record_count
+            else:
+                from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+
+                arrow_scan = ArrowScan(
+                    table_metadata=self.table_metadata,
+                    io=self.io,
+                    projected_schema=self.projection(),
+                    row_filter=self.row_filter,
+                    case_sensitive=self.case_sensitive,
+                    limit=self.limit,

Review Comment:
   This will lead to incorrect results since we don't want to limit the count
   ```suggestion
                       limit=self.limit,
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -1596,6 +1630,43 @@ def to_ray(self) -> ray.data.dataset.Dataset:
 
         return ray.data.from_arrow(self.to_arrow())
 
+    def count(self) -> int:
+        # Usage: Calculates the total number of records in a Scan that haven't 
had positional deletes.
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
+
+        for task in tasks:
+            # task.residual is a Boolean Expression if the filter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and not len(task.delete_files):

Review Comment:
   I think this is a bit more explicit:
   ```suggestion
               if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to