paultmathew opened a new pull request, #3335:
URL: https://github.com/apache/iceberg-python/pull/3335

   # Rationale for this change
   
   Closes #2152, addresses the long-standing memory problem reported in #1004 
and re-discovered by dlt-hub#3753.
   
   `Table.append(df)` and `Table.overwrite(df)` currently require a fully 
materialised `pa.Table`. For large or unbounded inputs this means loading the 
entire dataset into memory before writing — fatal at any non-trivial scale and 
a recurring complaint going back to #1004 (Aug 2024). The reference Java 
implementation has streaming append; iceberg-go shipped it in iceberg-go#369 
(Apr 2025). Python is the last major SDK without it.
   
   This PR adds `pa.RecordBatchReader` as a valid input to 
`Table.append/overwrite` (and `Transaction.append/overwrite`). The reader is 
consumed lazily, microbatched into Parquet files of approximately 
`write.target-file-size-bytes` (default 512 MiB) by the new 
`bin_pack_record_batches` helper, and committed in a single snapshot via the 
existing `fast_append` pipeline. Memory stays bounded by `target-file-size × 
N_workers` rather than the full input size.
   
   ```python
   reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
   tbl.append(reader)        # ← streams, doesn't materialise
   tbl.overwrite(reader)     # ← also supported
   ```
   
   ## Scope (unpartitioned only)
   
   Streaming into a partitioned table raises `NotImplementedError` pointing 
back to #2152. Partitioned support is genuinely the harder case — it needs 
design discussion around partition cardinality bounds, per-partition rolling 
writers, and idempotency on retry — so I'm proposing to land in three 
reviewable PRs:
   
   1. **This PR** — API + unpartitioned + buffered byte-budget bin-packing.
   2. **Follow-up** — switch internals to a rolling `pq.ParquetWriter` + 
`OutputStream.tell()` (now possible since #2998) for constant-memory streaming. 
No API change.
   3. **Later** — partitioned streaming, after design discussion on #2152.
   
   This mirrors iceberg-go#369's staging: ship the unpartitioned API first, 
iterate from there.
   
   ## Are these changes tested?
   
   Yes, comprehensively at four layers.
   
   **1. Unit tests** (`tests/io/test_pyarrow.py`) — 4 new tests for 
`bin_pack_record_batches` covering single-bin, microbatched, empty input, and 
lazy generator consumption. Verified the bin-packer drains its input one batch 
at a time rather than materialising the whole iterator.
   
   **2. End-to-end behaviour tests** 
(`tests/catalog/test_catalog_behaviors.py`) — 8 new tests parametrised across 
all three in-process catalog backends (`memory`, `sql`, `sql_without_rowcount`) 
→ 24 test runs covering:
   - append, overwrite, microbatch verification (multiple files in one snapshot)
   - empty reader, partitioned-table-raises, invalid-input-rejected
   - reader consumed exactly once (no double-pass regression)
   - schema mismatch raises before any data files are written (no S3 orphans)
   
   **3. Integration tests** (`tests/integration/test_writes/test_writes.py`) — 
6 new Spark-readback tests for v1 + v2 format versions covering append, 
overwrite, and multi-file microbatch. Proves Spark can read tables written via 
the streaming path against the docker-compose stack.
   
   **4. Smoke test on a real production stack** — verified end-to-end against 
AWS Glue + S3 in our staging account, including:
   - 100,000-row streaming append in 17s
   - 20-file microbatched commit
   - Athena read-back: `COUNT(*)` and `MAX(id)` matched the input exactly
   - Schema-mismatch rejection leaving no orphan files
   
   Full unit suite: 3647 passed. Full integration suite: 122 passed, 1 skipped.
   
   ## Are there any user-facing changes?
   
   Yes, intentionally:
   
   - `Transaction.append(df)`, `Transaction.overwrite(df)`, `Table.append(df)`, 
`Table.overwrite(df)` accept `pa.Table | pa.RecordBatchReader`.
   - The `ValueError` raised on bad input changes from `"Expected PyArrow 
table, got: ..."` to `"Expected pa.Table or pa.RecordBatchReader, got: ..."`. 
Updated `test_invalid_arguments` accordingly.
   - New module-level helper `bin_pack_record_batches` in 
`pyiceberg.io.pyarrow` (sibling of `bin_pack_arrow_table`).
   - Docs: new "Streaming writes from a RecordBatchReader" subsection in 
`mkdocs/docs/api.md`.
   - Docstrings on `Transaction.append/overwrite` document the retry semantics: 
a `RecordBatchReader` is consumed once, so retry strategies (factory callable, 
or two-stage write-then-`add_files`) are the caller's responsibility.
   
   ## Trade-off explicitly acknowledged
   
   The streaming path buffers up to `target_file_size` (default 512 MiB) of 
in-memory RecordBatches before flushing to a Parquet file. Peak memory 
therefore scales with `N_workers × target_file_size` (~4 GiB at defaults) 
rather than constant. This is materially better than the status-quo 
"materialise everything" but not yet constant memory — that's PR 2's job, where 
switching to a rolling `ParquetWriter` per file becomes natural now that 
`OutputStream.tell()` exists (#2998). Documented in the docstring.
   
   ## Related
   
   - Closes #2152
   - Addresses #1004 (closed by reporter without a fix)
   - Reference implementation: iceberg-go#369
   - Downstream consumer hitting the same problem: dlt-hub/dlt#3753 
(independent rediscovery of same approach)
   - Builds on the maintainer-blessed pattern from #1742's review 
(`_dataframe_to_data_files` + `fast_append.append_data_file()`, no separate 
`write_parquet` API)
   - Companion fix (already merged separately): test-state isolation in 
`test_write_optional_list`
   
   <!--
   Thanks for opening a pull request!
   -->
   
   <!-- In the case this PR will resolve an issue, please replace 
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
   <!-- Closes #${GITHUB_ISSUE_ID} -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to