abnobdoss commented on code in PR #3387:
URL: https://github.com/apache/iceberg-python/pull/3387#discussion_r3300276591
##########
tests/table/test_upsert.py:
##########
@@ -888,3 +892,660 @@ def test_upsert_snapshot_properties(catalog: Catalog) ->
None:
for snapshot in snapshots[initial_snapshot_count:]:
assert snapshot.summary is not None
assert snapshot.summary.additional_properties.get("test_prop") ==
"test_value"
+
+
+# ---------------------------------------------------------------------------
+# Partition-range augmentation for upsert row filters.
+#
+# ``Transaction.upsert`` builds its scan ``row_filter`` from ``join_cols``
+# alone via ``create_match_filter``. When the partition spec sources from
+# columns NOT in ``join_cols`` (a common pattern for append-only event logs
+# partitioned by time but keyed by composite IDs), ``inclusive_projection``
+# collapses the entire predicate to ``AlwaysTrue`` against the partition
+# spec and ``DataScan.plan_files`` falls through to a full table scan.
+#
+# ``augment_filter_with_partition_ranges`` derives ``[min, max]`` predicates
+# from ``df`` for every partition source column present in the frame and
+# ANDs them into the row filter. Iceberg's inclusive projection then
+# projects each range through the partition transform when planning the
+# scan, enabling manifest- and file-level pruning.
+#
+# See related issues #2138, #2159, #3129.
+# ---------------------------------------------------------------------------
+
+
+class TestAugmentFilterWithPartitionRanges:
+ """Pure-function tests for ``augment_filter_with_partition_ranges``.
+
+ Asserts the structural shape of the augmented predicate. End-to-end
+ file-pruning behaviour is exercised by the upsert integration tests
+ below.
+ """
+
+ @staticmethod
+ def _orders_schema() -> Schema:
+ return Schema(
+ NestedField(1, "order_id", IntegerType(), required=True),
+ NestedField(2, "order_date", DateType(), required=True),
+ NestedField(3, "order_type", StringType(), required=True),
+ )
+
+ @staticmethod
+ def _orders_pa_schema() -> pa.Schema:
+ return pa.schema(
+ [
+ pa.field("order_id", pa.int32(), nullable=False),
+ pa.field("order_date", pa.date32(), nullable=False),
+ pa.field("order_type", pa.string(), nullable=False),
+ ]
+ )
+
+ def _df(self, rows: list[dict[str, object]]) -> pa.Table:
+ return pa.Table.from_pylist(rows, schema=self._orders_pa_schema())
+
+ def test_unpartitioned_spec_returns_input_unchanged(self) -> None:
+ """Tables without a partition spec have nothing to project through.
+ The augmentation must short-circuit and hand back the exact
+ ``matched_predicate`` object — no allocation, no semantic change."""
+ df = self._df([{"order_id": 1, "order_date": datetime.date(2026, 1,
1), "order_type": "A"}])
+ matched = create_match_filter(df, ["order_id"])
+ augmented = augment_filter_with_partition_ranges(
+ matched_predicate=matched,
+ df=df,
+ schema=self._orders_schema(),
+ spec=UNPARTITIONED_PARTITION_SPEC,
+ )
+ assert augmented is matched
+
+ def test_partition_source_column_not_in_df_skipped(self) -> None:
+ """Source frames that don't contain the partition source column
+ can't contribute a bound — the augmentation has to skip rather
+ than guess. Returns ``matched_predicate`` unchanged so the
+ existing scan behaviour applies."""
+ df_no_date = pa.Table.from_pylist(
+ [{"order_id": 1, "order_type": "A"}],
+ schema=pa.schema(
+ [
+ pa.field("order_id", pa.int32(), nullable=False),
+ pa.field("order_type", pa.string(), nullable=False),
+ ]
+ ),
+ )
+ matched = create_match_filter(df_no_date, ["order_id"])
+ spec = PartitionSpec(PartitionField(source_id=2, field_id=1000,
transform=IdentityTransform(), name="order_date"))
+ augmented = augment_filter_with_partition_ranges(
+ matched_predicate=matched,
+ df=df_no_date,
+ schema=self._orders_schema(),
+ spec=spec,
+ )
+ assert augmented == matched
+
+ def test_partition_source_column_all_nulls_skipped(self) -> None:
+ """When every value of the partition source column in ``df`` is
+ null, there is no meaningful ``min`` / ``max`` to bound the
+ predicate. Skip rather than emit a vacuous augmentation."""
+ df = pa.Table.from_pylist(
+ [{"order_id": 1, "order_date": None, "order_type": "A"}],
+ schema=pa.schema(
+ [
+ pa.field("order_id", pa.int32(), nullable=False),
+ pa.field("order_date", pa.date32(), nullable=True),
+ pa.field("order_type", pa.string(), nullable=False),
+ ]
+ ),
+ )
+ matched = create_match_filter(df, ["order_id"])
+ spec = PartitionSpec(PartitionField(source_id=2, field_id=1000,
transform=IdentityTransform(), name="order_date"))
+ augmented = augment_filter_with_partition_ranges(
+ matched_predicate=matched,
+ df=df,
+ schema=self._orders_schema(),
+ spec=spec,
+ )
+ assert augmented == matched
+
+ def test_partition_source_column_some_nulls_skipped(self) -> None:
+ """Correctness guard: a partial-null source column cannot use a
+ non-null ``GreaterThanOrEqual`` augmentation because destination
+ rows whose partition value is NULL would be excluded from the
+ match scan even though their ``(key)`` may collide with the
+ null-partition source rows. Skip pruning over emitting an unsafe
+ predicate."""
+ df = pa.Table.from_pylist(
+ [
+ {"order_id": 1, "order_date": datetime.date(2026, 1, 1),
"order_type": "A"},
+ {"order_id": 2, "order_date": None, "order_type": "B"},
+ ],
+ schema=pa.schema(
+ [
+ pa.field("order_id", pa.int32(), nullable=False),
+ pa.field("order_date", pa.date32(), nullable=True),
+ pa.field("order_type", pa.string(), nullable=False),
+ ]
+ ),
+ )
+ matched = create_match_filter(df, ["order_id"])
+ spec = PartitionSpec(PartitionField(source_id=2, field_id=1000,
transform=IdentityTransform(), name="order_date"))
+ augmented = augment_filter_with_partition_ranges(
+ matched_predicate=matched,
+ df=df,
+ schema=self._orders_schema(),
+ spec=spec,
+ )
+ assert augmented == matched
+
+ def test_single_value_partition_column_emits_equal_to(self) -> None:
+ """``min == max`` collapses to a single ``EqualTo`` — tighter than
+ the range pair and lets exact partition pruning fire (e.g. when
+ every source row falls in the same hourly bucket)."""
+ df = self._df(
+ [
+ {"order_id": 1, "order_date": datetime.date(2026, 1, 1),
"order_type": "A"},
+ {"order_id": 2, "order_date": datetime.date(2026, 1, 1),
"order_type": "B"},
+ ]
+ )
+ matched = create_match_filter(df, ["order_id"])
+ spec = PartitionSpec(PartitionField(source_id=2, field_id=1000,
transform=IdentityTransform(), name="order_date"))
+ augmented = augment_filter_with_partition_ranges(
+ matched_predicate=matched,
+ df=df,
+ schema=self._orders_schema(),
+ spec=spec,
+ )
+ assert augmented == And(matched, EqualTo("order_date",
datetime.date(2026, 1, 1)))
+
+ def test_range_emits_gteq_and_lteq(self) -> None:
+ """Multiple distinct values → ``GreaterThanOrEqual(min) AND
+ LessThanOrEqual(max)`` pair, AND'd onto the original matched
+ predicate. Inclusive_projection handles the partition-transform
+ projection at scan time."""
+ df = self._df(
+ [
+ {"order_id": 1, "order_date": datetime.date(2026, 1, 1),
"order_type": "A"},
+ {"order_id": 2, "order_date": datetime.date(2026, 1, 15),
"order_type": "B"},
+ {"order_id": 3, "order_date": datetime.date(2026, 2, 1),
"order_type": "C"},
+ ]
+ )
+ matched = create_match_filter(df, ["order_id"])
+ spec = PartitionSpec(PartitionField(source_id=2, field_id=1000,
transform=IdentityTransform(), name="order_date"))
+ augmented = augment_filter_with_partition_ranges(
+ matched_predicate=matched,
+ df=df,
+ schema=self._orders_schema(),
+ spec=spec,
+ )
+ assert augmented == And(
+ And(matched, GreaterThanOrEqual("order_date", datetime.date(2026,
1, 1))),
+ LessThanOrEqual("order_date", datetime.date(2026, 2, 1)),
+ )
+
+ def test_multiple_partition_fields_share_source_id_emitted_once(self) ->
None:
+ """When two partition fields source from the same column (e.g.
+ ``bucket(8, id), truncate(4, id)``), only one source-column range
+ is emitted. ``inclusive_projection`` projects through each
+ partition field independently at scan time, so a single source-
+ range predicate suffices for both."""
+ df = self._df(
+ [
+ {"order_id": 1, "order_date": datetime.date(2026, 1, 1),
"order_type": "A"},
+ {"order_id": 10, "order_date": datetime.date(2026, 1, 2),
"order_type": "B"},
+ ]
+ )
+ matched = create_match_filter(df, ["order_type"])
+
+ from pyiceberg.transforms import BucketTransform, TruncateTransform
+
+ spec = PartitionSpec(
+ PartitionField(source_id=1, field_id=1000,
transform=BucketTransform(8), name="order_id_bucket"),
+ PartitionField(source_id=1, field_id=1001,
transform=TruncateTransform(4), name="order_id_trunc"),
+ )
+ augmented = augment_filter_with_partition_ranges(
+ matched_predicate=matched,
+ df=df,
+ schema=self._orders_schema(),
+ spec=spec,
+ )
+ # Exactly one ``GreaterThanOrEqual`` + ``LessThanOrEqual`` pair on
+ # ``order_id`` — not duplicated for each partition field.
+ assert augmented == And(
+ And(matched, GreaterThanOrEqual("order_id", 1)),
+ LessThanOrEqual("order_id", 10),
+ )
+
+
+class TestUpsertPartitionPruningIntegration:
+ """End-to-end upsert against partitioned tables.
+
+ Verifies that the augmented row filter doesn't change upsert
+ semantics — ``rows_updated`` / ``rows_inserted`` match the original
+ behaviour — across the three structural cases:
+
+ 1. Partition source not in ``join_cols`` (the case the augmentation
+ fires for; biggest perf gain).
+ 2. Partition source IS in ``join_cols`` (augmentation contributes
+ redundantly but doesn't change correctness).
+ 3. Unpartitioned (augmentation is a no-op).
+ """
+
+ def test_upsert_correct_when_partition_col_not_in_join_cols(self, catalog:
Catalog) -> None:
+ """Source partitioned by ``order_date`` but keyed on ``order_id``.
+ Augmentation fires — semantics must be identical to the
+ unpartitioned baseline."""
+ identifier = "default.test_upsert_partition_not_in_join_cols"
+ _drop_table(catalog, identifier)
+
+ schema = Schema(
+ NestedField(1, "order_id", IntegerType(), required=True),
+ NestedField(2, "order_date", DateType(), required=True),
+ NestedField(3, "order_type", StringType(), required=True),
+ )
+ spec = PartitionSpec(PartitionField(source_id=2, field_id=1000,
transform=IdentityTransform(), name="order_date"))
+ table = catalog.create_table(identifier, schema=schema,
partition_spec=spec)
+
+ arrow_schema = pa.schema(
+ [
+ pa.field("order_id", pa.int32(), nullable=False),
+ pa.field("order_date", pa.date32(), nullable=False),
+ pa.field("order_type", pa.string(), nullable=False),
+ ]
+ )
+ # Initial load: ids 1-5 across two different partitions.
+ initial = pa.Table.from_pylist(
+ [
+ {"order_id": 1, "order_date": datetime.date(2026, 1, 1),
"order_type": "A"},
+ {"order_id": 2, "order_date": datetime.date(2026, 1, 1),
"order_type": "A"},
+ {"order_id": 3, "order_date": datetime.date(2026, 1, 2),
"order_type": "A"},
Review Comment:
Does this still pass if this target row keeps order_id=3 but changes
order_date to datetime.date(2026, 5, 1)?
The quirk with this change is that it seems to assume the partition column
is part of the row identity. I’m not sure when that’s valid outside cases where
the partition is derived from the join key, and in those cases I’d expect the
join-key filter to already be sufficient for pruning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]