paultmathew opened a new pull request, #3387:
URL: https://github.com/apache/iceberg-python/pull/3387
# Rationale for this change
`Transaction.upsert` builds its scan `row_filter` from `join_cols`
alone via `create_match_filter`. When the partition spec sources from
columns NOT in `join_cols` — a common pattern for append-only event
logs partitioned by time but keyed by composite IDs — two amplifying
problems fall out at scan plan time:
- `inclusive_projection` collapses the predicate to `AlwaysTrue`
against the partition spec, so **partition pruning never fires** and
every file in the table is listed (related: #2138, #2159, #3129).
- Per-file metrics evaluation of the `Or(And(EqualTo, EqualTo), …)`
predicate on UUID-shaped key columns **can't prune either** —
per-file `lower_bound` / `upper_bound` stats span essentially the
full UUID space, so the metrics evaluator passes every file.
The result is a full-table scan at every upsert. For tables with 10k+
partitions this is multi-minute / multi-gigabyte work per call.
## What this PR does
Two complementary optimisations to `Transaction.upsert`:
1. **Partition-range augmentation**. New helper
`upsert_util.augment_filter_with_partition_ranges` derives
`[min, max]` predicates from `df` for every partition source column
present in the frame and ANDs them into the row filter.
`inclusive_projection` then projects each range through its
partition transform (`hours`, `days`, `months`, `years`, `identity`,
`truncate`) at scan-plan time, enabling manifest- and file-level
pruning.
2. **Column-projection for the insert-only path**. When
`when_matched_update_all=False` the consumer loop only reads
`join_cols` off each destination batch (to build the per-batch
source-side match filter). Passing `selected_fields=tuple(join_cols)`
to `DataScan` lets the parquet reader prune wide non-key columns.
The existing `_projected_field_ids.union(extract_field_ids(...))`
keeps the partition-range predicate's columns readable.
### Correctness guards
The augmentation skips per-column in three cases:
- The partition source column isn't present in `df` (no bound to derive).
- The column is entirely null in `df` (no `min`/`max`).
- The column has **any** null in `df` — a non-null `GreaterThanOrEqual`
predicate would exclude NULL-partition destination rows whose
`(join_cols)` may collide with null-partition source rows. Skip
pruning over emitting an unsafe predicate.
When `min == max`, an `EqualTo` is emitted instead of the range pair.
Multiple partition fields sourcing from the same column emit one
source-column range; `inclusive_projection` projects through each
partition field independently at scan time. Bucket and other
non-monotonic transforms return `None` from their `project` method on
inequalities — the projection contributes `AlwaysTrue` for them, no
harm.
## Are these changes tested?
Yes:
- 13 new unit tests in `tests/table/test_upsert.py`:
- Pure-function tests for `augment_filter_with_partition_ranges`
(unpartitioned, missing column, all-null, partial-null,
single-value, range, multi-field-sharing-source).
- End-to-end upsert semantics with partition spec NOT in
`join_cols`, IN `join_cols`, and unpartitioned.
- `DataScan.plan_files()` count assertion against a deterministically
seeded table that defeats per-file metrics pruning — confirms the
augmentation prunes vs the original predicate.
- `selected_fields` projection assertions for both
`when_matched_update_all=True` (legacy `('*',)`) and `=False`
(narrow `join_cols`-only).
- End-to-end upsert with `when_matched_update_all=True` against a
wide table to confirm column projection doesn't break the update
path.
- 23 existing upsert tests still pass.
### Smoke test — real Iceberg-on-S3 + Glue table
Run against a real Iceberg table representative of the workload this
optimisation targets.
**Stack**
- `pyiceberg.catalog.glue.GlueCatalog`
- AWS S3 warehouse, parquet data files
- Iceberg format v2
**Target table**
- Write mode: append-only event log
- `unique_keys`: `[conversation_id, id]` (composite UUID/string key)
- `partition_spec`: `hours(created_at)`
- Size at the test snapshot: **~10,450 data files, ~3.2 GiB total**
- Hourly partitions over ~15 months of history
- Avg file size: ~0.32 MiB (post hourly OPTIMIZE compaction)
- Schema (6 columns):
- `conversation_id` (string, UUID v4) — key
- `id` (string, UUID v4) — key
- `event` (string, short tag, ~10 B/row)
- `log` (string, JSON payload, **~400 B/row median**)
- `created_at` (timestamp[us, UTC]) — partition source
- `version` (int32)
**Source synthesis** (two modes for the comparison):
- `synthetic`: random UUIDs; `conversation_id` drawn from a pool sized
`rows/30` so leading-key cardinality matches realistic parent-child
distribution; `created_at` uniformly in `[now − hours, now]`. Keys
don't overlap destination → metrics evaluator rejects every file at
scan-plan time, so both projections return 0 files. **Isolates the
planning cost.**
- `from-destination`: reads N recent rows from the destination via a
`created_at` range scan, used as the source. Keys DO overlap →
exercises the **file-count reduction** and **column-read** effect.
**Results** (read-only via `DataScan.plan_files()` and
`to_arrow_batch_reader()`):
| Scenario | Original plan | Augmented plan | Plan-time win | Wide read |
Narrow read |
|---|---|---|---|---|---|
| 1 000 synthetic rows / 24 h | 0 / 10 452 (454.04 s) | 0 / 10 452 (3.11 s)
| **146×** | — | — |
| 1 000 dest-sampled rows / 24 h | (skipped, 7-min baseline) | **7 / 10 452
(99.93%)** | — | 493 KiB / 1 000 rows | 68 KiB / 1 000 rows (**86% smaller**) |
| 10 000 dest-sampled rows / 168 h (catch-up flush) | (skipped) | **58 / 10
492 (99.4%)** | — | — | — |
The 146× plan-time win is on a 1 000-disjunct predicate against a
~10k-file table; the original cost scales linearly with table file
count and predicate disjunct count, the augmented cost scales with the
source's `created_at` span instead.
The 86% byte reduction is dominated by skipping the `log` (JSON
payload) column at the parquet reader — that one column carries ~80%
of the row width on this table.
For a representative larger flush — 1.16M source rows spanning ~24 h
— extrapolating both wins reduces the destination-scan working set
from multiple GiB (which is OOM-territory on 8 GiB worker pods) down
to tens of MiB.
## Are there any user-facing changes?
No API change. The optimisation is purely internal to
`Transaction.upsert`:
- The new helper is exported from `pyiceberg.table.upsert_util` for
testability but isn't part of the public API.
- `selected_fields=tuple(join_cols)` is passed conditionally inside the
method — no signature change to `Table.upsert` or
`Transaction.upsert`.
## Related
- Relates to #2138 (partition-pruning suggestion by `@koenvo`), #2159
(umbrella slow-upsert tracker), #3129 (recent: `create_match_filter`
+ no-partition-prune).
- Complementary to (closed-stale) #2943's "coarse match filter"
approach — that PR shrinks the row predicate itself; this one adds
partition pruning the row predicate can't trigger on its own. Both
can coexist.
##### Was generative AI tooling used to co-author this PR?
- [X] Yes — Claude (Cursor agent)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]