Erigara commented on code in PR #2029: URL: https://github.com/apache/iceberg-python/pull/2029#discussion_r2100082961
########## pyiceberg/expressions/visitors.py: ########## @@ -894,12 +895,17 @@ def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpr def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression: file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id) + field_name = predicate.term.ref().field.name if file_column_name is None: # In the case of schema evolution, the column might not be present # in the file schema when reading older data if isinstance(predicate, BoundIsNull): return AlwaysTrue() + # Projected fields are only available for identity partition fields + # Which mean that partition pruning excluded partition field which evaluates to false + elif field_name in self.projected_missing_fields: + return AlwaysTrue() Review Comment: Here is the script which illustrate my point: <details> <summary>bug.py</summary> ```python #!/usr/bin/env python import polars as pl from pyiceberg.catalog import load_catalog from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg import expressions as expr from pyiceberg.transforms import IdentityTransform from pyiceberg.table import _parquet_files_to_data_files from pyiceberg.table.name_mapping import NameMapping, MappedField from pyiceberg.io.pyarrow import pyarrow_to_schema catalog = load_catalog( "default", **{ "type": "in-memory", "warehouse": "warehouse/", } ) catalog.create_namespace_if_not_exists("default") df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet") # write filtered data file_vendor_1 = "warehouse/VendorID=1_yellow_tripdata_2025-01.parquet" df.filter(pl.col("VendorID") == 1).drop("VendorID").write_parquet(file_vendor_1) # create iceberg table schema = df.to_arrow().schema mapping = NameMapping([MappedField(field_id=i,names=[name]) for i, name in enumerate(schema.names, 1)]) schema = pyarrow_to_schema(schema, mapping) table = catalog.create_table_if_not_exists( "default.taxi", schema=schema, partition_spec=PartitionSpec( PartitionField(source_id=schema.find_field("VendorID").field_id, field_id=schema.find_field("VendorID").field_id, transform=IdentityTransform(), name="VendorID"), ), properties={"schema.name-mapping.default": mapping.model_dump_json()}, ) # add_files files = [file_vendor_1] with table.transaction() as tx: with tx.update_snapshot().fast_append() as fast_append: data_files = _parquet_files_to_data_files( table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io ) for data_file in data_files: # set partition for VendorID # current file has only one partition anyway # VendorID = 1 data_file.partition[0] = 1 fast_append.append_data_file(data_file) # query with projected field in predicate scan = table.scan(row_filter=expr.And( expr.EqualTo("VendorID", 1), expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'), expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'), )) print(f"WITH PROJECTED COLUMN LEN: {len(scan.to_arrow())}") # query without projected field in predicate scan = table.scan(row_filter=expr.And( expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'), expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'), )) print(f"WITHOUT PROJECTED COLUM LEN: {len(scan.to_arrow())}") # add second partition file_vendor_2 = "warehouse/VendorID=2_yellow_tripdata_2025-01.parquet" df.filter(pl.col("VendorID") == 2).drop("VendorID").write_parquet(file_vendor_2) # add_files files = [file_vendor_2] with table.transaction() as tx: with tx.update_snapshot().fast_append() as fast_append: data_files = _parquet_files_to_data_files( table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io ) for data_file in data_files: # set partition for VendorID # current file has only one partition anyway # VendorID = 2 data_file.partition[0] = 2 fast_append.append_data_file(data_file) expr_1 = expr.And( expr.EqualTo("VendorID", 1), expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'), expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'), ) scan = table.scan(row_filter=expr_1) len_1 = len(scan.to_arrow()) print(f"1: VendorID = 1 AND tpep_pickup_datetime BETWEEN 2025-01-01T12:00:00 AND 2025-01-01T18:00:00: {len_1}") expr_2 = expr.And( expr.EqualTo("VendorID", 2), expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-02T12:00:00'), expr.LessThan("tpep_pickup_datetime", '2025-01-02T18:00:00'), ) scan = table.scan(row_filter=expr_2) len_2 = len(scan.to_arrow()) print(f"2: VendorID = 2 AND tpep_pickup_datetime BETWEEN 2025-01-02T12:00:00 AND 2025-01-02T18:00:00: {len_2}") expr_3 = expr.Or(expr_1, expr_2) scan = table.scan(row_filter=expr_3) len_3 = len(scan.to_arrow()) print(f"3: 1 OR 2: {len(scan.to_arrow())} (== {len_1 + len_2} is {len_3 == len_1 + len_2})") ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org