Erigara commented on code in PR #2029:
URL: https://github.com/apache/iceberg-python/pull/2029#discussion_r2100859083


##########
pyiceberg/expressions/visitors.py:
##########
@@ -894,12 +895,17 @@ def visit_unbound_predicate(self, predicate: 
UnboundPredicate[L]) -> BooleanExpr
 
     def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> 
BooleanExpression:
         file_column_name = 
self.file_schema.find_column_name(predicate.term.ref().field.field_id)
+        field_name = predicate.term.ref().field.name
 
         if file_column_name is None:
             # In the case of schema evolution, the column might not be present
             # in the file schema when reading older data
             if isinstance(predicate, BoundIsNull):
                 return AlwaysTrue()
+            # Projected fields are only available for identity partition fields
+            # Which mean that partition pruning excluded partition field which 
evaluates to false
+            elif field_name in self.projected_missing_fields:
+                return AlwaysTrue()

Review Comment:
   @Fokko here is a as small example as i could produce (it still quite big in 
terms of LoC due to setup).
   
   Basically it works with the following iceberg table:
   
   | col1 | col2 |
   | ---- | ---- |
   |   1    |   1   |
   |   1    |   2   |
   |   2    |   1   |
   |   2    |   2   |
   
   Where `col1` is used for partitioning and absent from parquet data file.
   
   So physical layout is following:
   - `col1 = 1 -> warehouse/1.parquet`
   - `col2 = 2 -> warehouse/2.parquet` 
   
   Both files `1.parquet` and `2.parquet` are the same with following structure 
`{'col2: [1, 2]}`.
   
   
   ```python
   #!/usr/bin/env python
   import pyarrow as pa
   import pyarrow.parquet as pq
   from pyiceberg import expressions as expr
   from pyiceberg.catalog import load_catalog
   from pyiceberg.partitioning import PartitionSpec, PartitionField
   from pyiceberg.io.pyarrow import parquet_file_to_data_file
   from pyiceberg.transforms import IdentityTransform
   from pyiceberg.schema import Schema
   from pyiceberg.table.name_mapping import NameMapping, MappedField
   from pyiceberg.types import (
       NestedField,
       LongType,
   )
   
   catalog = load_catalog(
       "default",
       **{
           "type": "in-memory",
           "warehouse": "warehouse/",
       }
   )
   
   catalog.create_namespace("default")
   
   # create iceberg table
   schema = Schema(
       NestedField(field_id=1, name="col1", field_type=LongType()),
       NestedField(field_id=2, name="col2", field_type=LongType()),
   )
   mapping = NameMapping([
       MappedField(field_id=1,names=["col1"]),
       MappedField(field_id=2,names=["col2"]),
   ])
   table = catalog.create_table_if_not_exists(
       "default.table",
       schema=schema,
       partition_spec=PartitionSpec(
           PartitionField(source_id=1, field_id=1001, 
transform=IdentityTransform(), name="col1"),
       ),
       properties={"schema.name-mapping.default": mapping.model_dump_json()},
   )
   
   # write 2 parquet files: one for col1 partition values 1 and 2
   file_1 = "warehouse/1.parquet"
   file_2 = "warehouse/2.parquet"
   df = pa.Table.from_arrays([[1, 2]], names=["col2"])
   pq.write_table(df, file_1)
   pq.write_table(df, file_2)
   
   # add_files into iceberg table assign each file partition value
   with table.transaction() as tx:
       with tx.update_snapshot().fast_append() as fast_append:
           data_file_1 = parquet_file_to_data_file(tx._table.io, 
tx.table_metadata, file_1)
           data_file_1.partition[0] = 1
           fast_append.append_data_file(data_file_1)
           data_file_2 = parquet_file_to_data_file(tx._table.io, 
tx.table_metadata, file_1)
           data_file_2.partition[0] = 2
           fast_append.append_data_file(data_file_2)
   
   expr_1 = expr.And(expr.EqualTo("col1", 1), expr.EqualTo("col2", 1))
   scan_1 = table.scan(row_filter=expr_1)
   len_1 = len(scan_1.to_arrow())
   assert len_1 == 1
   
   expr_2 = expr.And(expr.EqualTo("col1", 2), expr.EqualTo("col2", 2))
   scan_2 = table.scan(row_filter=expr_2)
   len_2 = len(scan_2.to_arrow())
   assert len_2 == 1
   
   expr_3 = expr.Or(expr_1, expr_2)
   scan_3 = table.scan(row_filter=expr_3)
   len_3 = len(scan_3.to_arrow())
   assert len_3 == len_1 + len_2
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to