Erigara commented on code in PR #2029:
URL: https://github.com/apache/iceberg-python/pull/2029#discussion_r2236597985
##########
pyiceberg/expressions/visitors.py:
##########
@@ -930,6 +930,82 @@ def translate_column_names(expr: BooleanExpression,
file_schema: Schema, case_se
return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive))
+class _ProjectedColumnsEvaluator(BooleanExpressionVisitor[BooleanExpression]):
+ """Evaluated predicates which involve projected columns missing from the
file.
+
+ Args:
+ file_schema (Schema): The schema of the file.
+ projected_schema (Schema): The schema to project onto the data files.
+ case_sensitive (bool): Whether to consider case when binding a reference
to a field in a schema, defaults to True.
+ projected_missing_fields(dict[str, Any]): Map of fields missing in
file_schema, but present as partition values.
+
+ Raises:
+ TypeError: In the case of an UnboundPredicate.
+ """
+
+ file_schema: Schema
+ case_sensitive: bool
+
+ def __init__(
+ self, file_schema: Schema, projected_schema: Schema, case_sensitive:
bool, projected_missing_fields: dict[str, Any]
+ ) -> None:
+ self.file_schema = file_schema
+ self.projected_schema = projected_schema
+ self.case_sensitive = case_sensitive
+ self.projected_missing_fields = projected_missing_fields
+
+ def visit_true(self) -> BooleanExpression:
+ return AlwaysTrue()
+
+ def visit_false(self) -> BooleanExpression:
+ return AlwaysFalse()
+
+ def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+ return Not(child=child_result)
+
+ def visit_and(self, left_result: BooleanExpression, right_result:
BooleanExpression) -> BooleanExpression:
+ return And(left=left_result, right=right_result)
+
+ def visit_or(self, left_result: BooleanExpression, right_result:
BooleanExpression) -> BooleanExpression:
+ return Or(left=left_result, right=right_result)
+
+ def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) ->
BooleanExpression:
+ raise TypeError(f"Expected Bound Predicate, got: {predicate.term}")
+
+ def visit_bound_predicate(self, predicate: BoundPredicate[L]) ->
BooleanExpression:
+ file_column_name =
self.file_schema.find_column_name(predicate.term.ref().field.field_id)
+
+ if file_column_name is None and (field_name :=
predicate.term.ref().field.name) in self.projected_missing_fields:
+ unbound_predicate: BooleanExpression
+ if isinstance(predicate, BoundUnaryPredicate):
+ unbound_predicate = predicate.as_unbound(field_name)
+ elif isinstance(predicate, BoundLiteralPredicate):
+ unbound_predicate = predicate.as_unbound(field_name,
predicate.literal)
+ elif isinstance(predicate, BoundSetPredicate):
+ unbound_predicate = predicate.as_unbound(field_name,
predicate.literals)
+ else:
+ raise ValueError(f"Unsupported predicate: {predicate}")
+ field = self.projected_schema.find_field(field_name)
+ schema = Schema(field)
+ evaluator = expression_evaluator(schema, unbound_predicate,
self.case_sensitive)
+ if evaluator(Record(self.projected_missing_fields[field_name])):
Review Comment:
Because you
[asked](https://github.com/apache/iceberg-python/pull/2029#discussion_r2173819438)
me of doing that initially it was in `_ColumnNameTranslator` to get job done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]