Erigara commented on code in PR #2029:
URL: https://github.com/apache/iceberg-python/pull/2029#discussion_r2100082961


##########
pyiceberg/expressions/visitors.py:
##########
@@ -894,12 +895,17 @@ def visit_unbound_predicate(self, predicate: 
UnboundPredicate[L]) -> BooleanExpr
 
     def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> 
BooleanExpression:
         file_column_name = 
self.file_schema.find_column_name(predicate.term.ref().field.field_id)
+        field_name = predicate.term.ref().field.name
 
         if file_column_name is None:
             # In the case of schema evolution, the column might not be present
             # in the file schema when reading older data
             if isinstance(predicate, BoundIsNull):
                 return AlwaysTrue()
+            # Projected fields are only available for identity partition fields
+            # Which mean that partition pruning excluded partition field which 
evaluates to false
+            elif field_name in self.projected_missing_fields:
+                return AlwaysTrue()

Review Comment:
   Here is the script which illustrate my point:
   
   <details>
   
   <summary>bug.py</summary>
   
   ```python
   #!/usr/bin/env python
   import polars as pl
   from pyiceberg.catalog import load_catalog
   from pyiceberg.partitioning import PartitionSpec, PartitionField
   from pyiceberg import expressions as expr
   from pyiceberg.transforms import IdentityTransform
   from pyiceberg.table import _parquet_files_to_data_files
   from pyiceberg.table.name_mapping import NameMapping, MappedField
   from pyiceberg.io.pyarrow import pyarrow_to_schema
   
   catalog = load_catalog(
       "default",
       **{
           "type": "in-memory",
           "warehouse": "warehouse/",
       }
   )
   
   catalog.create_namespace_if_not_exists("default")
   
   df = 
pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet";)
   
   # write filtered data 
   file_vendor_1 = "warehouse/VendorID=1_yellow_tripdata_2025-01.parquet"
   df.filter(pl.col("VendorID") == 
1).drop("VendorID").write_parquet(file_vendor_1)
   
   # create iceberg table
   schema = df.to_arrow().schema
   mapping = NameMapping([MappedField(field_id=i,names=[name]) for i, name in 
enumerate(schema.names, 1)])
   schema = pyarrow_to_schema(schema, mapping)
   table = catalog.create_table_if_not_exists(
       "default.taxi",
       schema=schema,
       partition_spec=PartitionSpec(
           PartitionField(source_id=schema.find_field("VendorID").field_id, 
field_id=schema.find_field("VendorID").field_id, transform=IdentityTransform(), 
name="VendorID"),
       ),
       properties={"schema.name-mapping.default": mapping.model_dump_json()},
   )
   
   # add_files
   files = [file_vendor_1]
   with table.transaction() as tx:
       with tx.update_snapshot().fast_append() as fast_append:
           data_files = _parquet_files_to_data_files(
               table_metadata=tx.table_metadata, file_paths=files, 
io=tx._table.io
           )
           for data_file in data_files:
               # set partition for VendorID
               # current file has only one partition anyway
               # VendorID = 1
               data_file.partition[0] = 1
               fast_append.append_data_file(data_file)
   
   # query with projected field in predicate
   scan = table.scan(row_filter=expr.And(
           expr.EqualTo("VendorID", 1),
           expr.GreaterThanOrEqual("tpep_pickup_datetime", 
'2025-01-01T12:00:00'),
           expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
       ))
   print(f"WITH PROJECTED COLUMN LEN: {len(scan.to_arrow())}")
   
   # query without projected field in predicate
   scan = table.scan(row_filter=expr.And(
           expr.GreaterThanOrEqual("tpep_pickup_datetime", 
'2025-01-01T12:00:00'),
           expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
       ))
   print(f"WITHOUT PROJECTED COLUM LEN: {len(scan.to_arrow())}")
   
   # add second partition
   file_vendor_2 = "warehouse/VendorID=2_yellow_tripdata_2025-01.parquet"
   df.filter(pl.col("VendorID") == 
2).drop("VendorID").write_parquet(file_vendor_2)
   
   # add_files
   files = [file_vendor_2]
   with table.transaction() as tx:
       with tx.update_snapshot().fast_append() as fast_append:
           data_files = _parquet_files_to_data_files(
               table_metadata=tx.table_metadata, file_paths=files, 
io=tx._table.io
           )
           for data_file in data_files:
               # set partition for VendorID
               # current file has only one partition anyway
               # VendorID = 2
               data_file.partition[0] = 2
               fast_append.append_data_file(data_file)
   
   expr_1 = expr.And(
       expr.EqualTo("VendorID", 1),
       expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'),
       expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
   )
   scan = table.scan(row_filter=expr_1)
   len_1 = len(scan.to_arrow())
   print(f"1: VendorID = 1 AND tpep_pickup_datetime BETWEEN 2025-01-01T12:00:00 
AND 2025-01-01T18:00:00: {len_1}")
   
   expr_2 = expr.And(
       expr.EqualTo("VendorID", 2),
       expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-02T12:00:00'),
       expr.LessThan("tpep_pickup_datetime", '2025-01-02T18:00:00'),
   )
   scan = table.scan(row_filter=expr_2)
   len_2 = len(scan.to_arrow())
   print(f"2: VendorID = 2 AND tpep_pickup_datetime BETWEEN 2025-01-02T12:00:00 
AND 2025-01-02T18:00:00: {len_2}")
   
   expr_3 = expr.Or(expr_1, expr_2)
   scan = table.scan(row_filter=expr_3)
   len_3 = len(scan.to_arrow())
   print(f"3: 1 OR 2: {len(scan.to_arrow())} (== {len_1 + len_2} is {len_3 == 
len_1 + len_2})")
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to