paultmathew opened a new pull request, #3335:
URL: https://github.com/apache/iceberg-python/pull/3335
# Rationale for this change
Closes #2152, addresses the long-standing memory problem reported in #1004
and re-discovered by dlt-hub#3753.
`Table.append(df)` and `Table.overwrite(df)` currently require a fully
materialised `pa.Table`. For large or unbounded inputs this means loading the
entire dataset into memory before writing — fatal at any non-trivial scale and
a recurring complaint going back to #1004 (Aug 2024). The reference Java
implementation has streaming append; iceberg-go shipped it in iceberg-go#369
(Apr 2025). Python is the last major SDK without it.
This PR adds `pa.RecordBatchReader` as a valid input to
`Table.append/overwrite` (and `Transaction.append/overwrite`). The reader is
consumed lazily, microbatched into Parquet files of approximately
`write.target-file-size-bytes` (default 512 MiB) by the new
`bin_pack_record_batches` helper, and committed in a single snapshot via the
existing `fast_append` pipeline. Memory stays bounded by `target-file-size ×
N_workers` rather than the full input size.
```python
reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
tbl.append(reader) # ← streams, doesn't materialise
tbl.overwrite(reader) # ← also supported
```
## Scope (unpartitioned only)
Streaming into a partitioned table raises `NotImplementedError` pointing
back to #2152. Partitioned support is genuinely the harder case — it needs
design discussion around partition cardinality bounds, per-partition rolling
writers, and idempotency on retry — so I'm proposing to land in three
reviewable PRs:
1. **This PR** — API + unpartitioned + buffered byte-budget bin-packing.
2. **Follow-up** — switch internals to a rolling `pq.ParquetWriter` +
`OutputStream.tell()` (now possible since #2998) for constant-memory streaming.
No API change.
3. **Later** — partitioned streaming, after design discussion on #2152.
This mirrors iceberg-go#369's staging: ship the unpartitioned API first,
iterate from there.
## Are these changes tested?
Yes, comprehensively at four layers.
**1. Unit tests** (`tests/io/test_pyarrow.py`) — 4 new tests for
`bin_pack_record_batches` covering single-bin, microbatched, empty input, and
lazy generator consumption. Verified the bin-packer drains its input one batch
at a time rather than materialising the whole iterator.
**2. End-to-end behaviour tests**
(`tests/catalog/test_catalog_behaviors.py`) — 8 new tests parametrised across
all three in-process catalog backends (`memory`, `sql`, `sql_without_rowcount`)
→ 24 test runs covering:
- append, overwrite, microbatch verification (multiple files in one snapshot)
- empty reader, partitioned-table-raises, invalid-input-rejected
- reader consumed exactly once (no double-pass regression)
- schema mismatch raises before any data files are written (no S3 orphans)
**3. Integration tests** (`tests/integration/test_writes/test_writes.py`) —
6 new Spark-readback tests for v1 + v2 format versions covering append,
overwrite, and multi-file microbatch. Proves Spark can read tables written via
the streaming path against the docker-compose stack.
**4. Smoke test on a real production stack** — verified end-to-end against
AWS Glue + S3 in our staging account, including:
- 100,000-row streaming append in 17s
- 20-file microbatched commit
- Athena read-back: `COUNT(*)` and `MAX(id)` matched the input exactly
- Schema-mismatch rejection leaving no orphan files
Full unit suite: 3647 passed. Full integration suite: 122 passed, 1 skipped.
## Are there any user-facing changes?
Yes, intentionally:
- `Transaction.append(df)`, `Transaction.overwrite(df)`, `Table.append(df)`,
`Table.overwrite(df)` accept `pa.Table | pa.RecordBatchReader`.
- The `ValueError` raised on bad input changes from `"Expected PyArrow
table, got: ..."` to `"Expected pa.Table or pa.RecordBatchReader, got: ..."`.
Updated `test_invalid_arguments` accordingly.
- New module-level helper `bin_pack_record_batches` in
`pyiceberg.io.pyarrow` (sibling of `bin_pack_arrow_table`).
- Docs: new "Streaming writes from a RecordBatchReader" subsection in
`mkdocs/docs/api.md`.
- Docstrings on `Transaction.append/overwrite` document the retry semantics:
a `RecordBatchReader` is consumed once, so retry strategies (factory callable,
or two-stage write-then-`add_files`) are the caller's responsibility.
## Trade-off explicitly acknowledged
The streaming path buffers up to `target_file_size` (default 512 MiB) of
in-memory RecordBatches before flushing to a Parquet file. Peak memory
therefore scales with `N_workers × target_file_size` (~4 GiB at defaults)
rather than constant. This is materially better than the status-quo
"materialise everything" but not yet constant memory — that's PR 2's job, where
switching to a rolling `ParquetWriter` per file becomes natural now that
`OutputStream.tell()` exists (#2998). Documented in the docstring.
## Related
- Closes #2152
- Addresses #1004 (closed by reporter without a fix)
- Reference implementation: iceberg-go#369
- Downstream consumer hitting the same problem: dlt-hub/dlt#3753
(independent rediscovery of same approach)
- Builds on the maintainer-blessed pattern from #1742's review
(`_dataframe_to_data_files` + `fast_append.append_data_file()`, no separate
`write_parquet` API)
- Companion fix (already merged separately): test-state isolation in
`test_write_optional_list`
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]