Erigara commented on code in PR #2029:
URL: https://github.com/apache/iceberg-python/pull/2029#discussion_r2100082961
##########
pyiceberg/expressions/visitors.py:
##########
@@ -894,12 +895,17 @@ def visit_unbound_predicate(self, predicate:
UnboundPredicate[L]) -> BooleanExpr
def visit_bound_predicate(self, predicate: BoundPredicate[L]) ->
BooleanExpression:
file_column_name =
self.file_schema.find_column_name(predicate.term.ref().field.field_id)
+ field_name = predicate.term.ref().field.name
if file_column_name is None:
# In the case of schema evolution, the column might not be present
# in the file schema when reading older data
if isinstance(predicate, BoundIsNull):
return AlwaysTrue()
+ # Projected fields are only available for identity partition fields
+ # Which mean that partition pruning excluded partition field which
evaluates to false
+ elif field_name in self.projected_missing_fields:
+ return AlwaysTrue()
Review Comment:
Here is the script which illustrate my point:
<details>
<summary>bug.py</summary>
```python
#!/usr/bin/env python
import polars as pl
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg import expressions as expr
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table import _parquet_files_to_data_files
from pyiceberg.table.name_mapping import NameMapping, MappedField
from pyiceberg.io.pyarrow import pyarrow_to_schema
catalog = load_catalog(
"default",
**{
"type": "in-memory",
"warehouse": "warehouse/",
}
)
catalog.create_namespace_if_not_exists("default")
df =
pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet")
# write filtered data
file_vendor_1 = "warehouse/VendorID=1_yellow_tripdata_2025-01.parquet"
df.filter(pl.col("VendorID") ==
1).drop("VendorID").write_parquet(file_vendor_1)
# create iceberg table
schema = df.to_arrow().schema
mapping = NameMapping([MappedField(field_id=i,names=[name]) for i, name in
enumerate(schema.names, 1)])
schema = pyarrow_to_schema(schema, mapping)
table = catalog.create_table_if_not_exists(
"default.taxi",
schema=schema,
partition_spec=PartitionSpec(
PartitionField(source_id=schema.find_field("VendorID").field_id,
field_id=schema.find_field("VendorID").field_id, transform=IdentityTransform(),
name="VendorID"),
),
properties={"schema.name-mapping.default": mapping.model_dump_json()},
)
# add_files
files = [file_vendor_1]
with table.transaction() as tx:
with tx.update_snapshot().fast_append() as fast_append:
data_files = _parquet_files_to_data_files(
table_metadata=tx.table_metadata, file_paths=files,
io=tx._table.io
)
for data_file in data_files:
# set partition for VendorID
# current file has only one partition anyway
# VendorID = 1
data_file.partition[0] = 1
fast_append.append_data_file(data_file)
# query with projected field in predicate
scan = table.scan(row_filter=expr.And(
expr.EqualTo("VendorID", 1),
expr.GreaterThanOrEqual("tpep_pickup_datetime",
'2025-01-01T12:00:00'),
expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
))
print(f"WITH PROJECTED COLUMN LEN: {len(scan.to_arrow())}")
# query without projected field in predicate
scan = table.scan(row_filter=expr.And(
expr.GreaterThanOrEqual("tpep_pickup_datetime",
'2025-01-01T12:00:00'),
expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
))
print(f"WITHOUT PROJECTED COLUM LEN: {len(scan.to_arrow())}")
# add second partition
file_vendor_2 = "warehouse/VendorID=2_yellow_tripdata_2025-01.parquet"
df.filter(pl.col("VendorID") ==
2).drop("VendorID").write_parquet(file_vendor_2)
# add_files
files = [file_vendor_2]
with table.transaction() as tx:
with tx.update_snapshot().fast_append() as fast_append:
data_files = _parquet_files_to_data_files(
table_metadata=tx.table_metadata, file_paths=files,
io=tx._table.io
)
for data_file in data_files:
# set partition for VendorID
# current file has only one partition anyway
# VendorID = 2
data_file.partition[0] = 2
fast_append.append_data_file(data_file)
expr_1 = expr.And(
expr.EqualTo("VendorID", 1),
expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'),
expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
)
scan = table.scan(row_filter=expr_1)
len_1 = len(scan.to_arrow())
print(f"1: VendorID = 1 AND tpep_pickup_datetime BETWEEN 2025-01-01T12:00:00
AND 2025-01-01T18:00:00: {len_1}")
expr_2 = expr.And(
expr.EqualTo("VendorID", 2),
expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-02T12:00:00'),
expr.LessThan("tpep_pickup_datetime", '2025-01-02T18:00:00'),
)
scan = table.scan(row_filter=expr_2)
len_2 = len(scan.to_arrow())
print(f"2: VendorID = 2 AND tpep_pickup_datetime BETWEEN 2025-01-02T12:00:00
AND 2025-01-02T18:00:00: {len_2}")
expr_3 = expr.Or(expr_1, expr_2)
scan = table.scan(row_filter=expr_3)
len_3 = len(scan.to_arrow())
print(f"3: 1 OR 2: {len(scan.to_arrow())} (== {len_1 + len_2} is {len_3 ==
len_1 + len_2})")
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]