paultmathew opened a new pull request, #3336:
URL: https://github.com/apache/iceberg-python/pull/3336
# Rationale for this change
Builds on #3335. Please land that one first.
PR #3335 added `pa.RecordBatchReader` as a valid input to
`Table.append`/`Table.overwrite` using a buffered bin-pack approach. That
implementation has two acknowledged caveats called out in its docstrings:
1. **Memory bound**: peak memory is `N_workers ×
write.target-file-size-bytes` (~4 GiB at defaults) — better than "materialise
everything" but not constant.
2. **Byte semantics**: `write.target-file-size-bytes` is interpreted as
uncompressed in-memory Arrow bytes via the `bin_pack_record_batches` helper,
not on-disk compressed Parquet bytes. Resulting files are typically 3-10×
smaller than the property suggests.
This PR replaces the buffered approach with a rolling `pq.ParquetWriter`
driven by `OutputStream.tell()` (added in #2998 specifically for this purpose).
Both caveats go away:
```python
# pyiceberg/io/pyarrow.py::_record_batches_to_data_files (excerpt)
with output_file.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=transformed_first.schema, ...) as
writer:
writer.write_batch(transformed_first, row_group_size=row_group_size)
while fos.tell() < target_file_size: # ← compressed on-disk
bytes
try:
batch = next(batches)
except StopIteration:
break
writer.write_batch(_transform(batch),
row_group_size=row_group_size)
```
What this delivers:
- **Spec-correct file sizes**: `tell()` reports compressed on-disk bytes
pyarrow has emitted to the stream, so `write.target-file-size-bytes` finally
means what the spec says it means — matching the Java/Spark/Flink writers.
- **Bounded memory independent of `target_file_size`**: peak RSS is bounded
by one input batch + the Parquet page buffer (~1 MiB × columns) + the S3
multipart upload pool (~5 MiB × ~8 in-flight parts). On a real S3 stack that's
tens to a few hundred MiB, regardless of file size, dataset size, or number of
files produced. See benchmark below.
- **Each input `RecordBatch` becomes one Parquet row group**, with
`write.parquet.row-group-limit` enforced as a per-row-group cap — identical
treatment to the materialised `pa.Table` write path.
- **No public API change.** Same `tx.append(reader)` /
`tx.overwrite(reader)`. Internals only.
- **`bin_pack_record_batches` is removed** (no longer needed). Its 4 unit
tests are removed; the streaming behaviour is covered end-to-end by tests below.
# Memory profile
Streamed 1,000 batches × 5,000 rows × 108 bytes per row (≈ **515 MiB
uncompressed**, **390 MiB on disk** after zstd of random alphanumeric payload,
**24 files** written at `write.target-file-size-bytes = 16 MiB`) against AWS
Glue + S3 (Aircall staging). Process RSS sampled at 19 Hz from a background
thread. Detailed analysis below.
| Metric | Value |
|---|---|
| Workload (uncompressed → on-disk) | 515 MiB → 390 MiB (1.3× zstd) |
| Files written / row groups | 24 / 5,024 |
| Wall time | 301 s (rate-limited by Python random-payload generation, not
the writer) |
| **Baseline RSS** | 178 MiB |
| **Peak RSS** | **236 MiB** (delta +58 MiB; reached at t ≈ 15 s during the
first file) |
| Steady-state mean RSS | 167 MiB (≈ 10 MiB below baseline once Python GC
reclaims import overhead) |
| Steady-state p95 RSS | 229 MiB |
| Steady-state σ | 18 MiB |
The key observation: after the initial ramp during the first file, **RSS
oscillates within a ~30 MiB band across all 24 file rollovers and shows no
growth from start to finish**. Memory is bounded by the in-flight `RecordBatch`
+ Parquet page buffer + multipart upload pool — independent of
`target_file_size`, dataset size, or number of files produced. The repository's
previous buffered approach (#3335) held up to `target_file_size × N_workers` of
uncompressed Arrow buffers (≈ 4 GiB at defaults) — **roughly 70× higher peak
memory at default property settings**.
<img width="1200" height="600" alt="smoke_memory_PR2"
src="https://github.com/user-attachments/assets/4526d52b-3035-4402-b37b-2807c4d52023"
/>
```
# Throughput / parallelism
Streaming writes are sequential — one rolling file at a time. Single-stream
throughput is bounded by the underlying multipart upload pool (~8 concurrent S3
PUTs in pyarrow.fs.S3FileSystem), which saturates typical network links and is
rarely the bottleneck for streaming pipelines (where the upstream source — DB
cursor, API, queue — is the limit). Callers wanting maximum write throughput
(backfills, dataset migrations) can materialise as `pa.Table` and use
`tx.append(pa.Table)`, which keeps the existing `executor.map`-based file-level
parallelism for the materialised path **completely unchanged** by this PR.
A hybrid worker pool for the streaming path (N concurrent rolling writers
fed by a queue) is a possible follow-up if real workloads show streaming write
throughput as a bottleneck. Mirrors iceberg-go's roadmap, which has shipped
single-writer-only streaming since April 2025 (iceberg-go#369) without
follow-up demand.
# Properties honored
The streaming path honors the same parquet writer properties as the
materialised `pa.Table` path:
| Property | In Iceberg spec? | Honored by this PR? |
|---|---|---|
| `write.target-file-size-bytes` | ✅ | ✅ via `OutputStream.tell()`, on-disk
compressed bytes |
| `write.parquet.compression-codec` / `compression-level` | ✅ | ✅ |
| `write.parquet.page-size-bytes` | ✅ | ✅ |
| `write.parquet.page-row-limit` | ✅ | ✅ |
| `write.parquet.dict-size-bytes` | ✅ | ✅ |
| `write.parquet.row-group-limit` (pyiceberg-internal) | ❌ (not in spec) | ✅
as a per-row-group cap, identical treatment to materialised path |
| `write.parquet.row-group-size-bytes` | ✅ | ❌ pre-existing pyiceberg-wide
gap, warned by `_get_parquet_writer_kwargs` for both paths; out of scope here |
| `write.parquet.bloom-filter-*` | ✅ | ❌ pre-existing pyiceberg-wide gap |
When someone fixes `write.parquet.row-group-size-bytes` for pyiceberg, both
write paths benefit. PR2 deliberately doesn't touch this since the gap predates
this PR series.
# Code duplication note
`_record_batches_to_data_files` shares some boilerplate with the
materialised `write_file`'s nested `write_parquet` closure:
parquet-writer-kwargs / row-group-size / location-provider extraction,
`file_schema` selection, and `DataFile` construction from Parquet metadata. The
shared module-level helpers (`_get_parquet_writer_kwargs`,
`_to_requested_schema`, `data_file_statistics_from_parquet_metadata`, etc.) are
reused, but the "compose these helpers in the standard pattern" wrapper lives
independently in each path.
Extraction is mechanical (~100 lines of pure refactor) but I'd prefer to
land it as a standalone follow-up PR — it touches the existing `write_file`
closure which I'd rather not modify in the same PR as the new streaming
implementation.
# Are these changes tested?
Yes, at four layers.
## 1. End-to-end behaviour tests (no Docker)
`tests/catalog/test_catalog_behaviors.py` — 10 tests parametrised across all
three in-process catalog backends (`memory`, `sql`, `sql_without_rowcount`) →
**30 test runs**:
- `test_append_record_batch_reader` — basic append round-trip.
- `test_append_record_batch_reader_microbatched` — multi-file rollover via
`target-file-size-bytes=1`.
- `test_append_record_batch_reader_row_group_limit_is_cap` — feeds a single
1,000-row batch, sets `row-group-limit=250`, asserts the resulting Parquet has
exactly 4 row groups of 250 rows each (verified by reading the Parquet footer
with `pq.read_metadata`).
- `test_append_record_batch_reader_target_file_size_is_on_disk_bytes` — sets
`target-file-size-bytes=32 KiB`, streams ~12 MiB, asserts each rolled file is
between 0.5× and 5× the target. Catches regression to the old
uncompressed-Arrow-bytes behaviour (which would produce files ~3-10× *smaller*
than target).
- `test_append_record_batch_reader_empty` — empty reader produces zero data
files.
- `test_overwrite_record_batch_reader` — overwrite via reader replaces
existing rows.
- `test_append_record_batch_reader_to_partitioned_table_raises` —
partitioned-table input raises `NotImplementedError`.
- `test_append_invalid_input_type_raises` — non-Arrow input rejected.
- `test_record_batch_reader_consumed_exactly_once` — reader generator
drained once; no double-pass regression.
- `test_record_batch_reader_schema_mismatch_writes_no_files` — schema
mismatch fails before any data files are written (no orphan files).
## 2. Spark integration tests
`tests/integration/test_writes/test_writes.py` — 6 tests (× v1, v2 format
versions) proving Spark can read tables written via the streaming path against
the docker-compose stack:
- `test_append_record_batch_reader[1, 2]`
- `test_overwrite_record_batch_reader[1, 2]`
- `test_append_record_batch_reader_multifile[1, 2]`
## 3. Local CI sweep
- `make test` (full unit suite): **3,650 passed**, 0 failed, lint + mypy +
pydocstyle clean.
- `make test-integration` (full Spark integration suite on fresh
docker-compose): **396 passed, 1 skipped, 0 failed** in 3:47.
## 4. Real-stack smoke test on AWS
Verified end-to-end against AWS Glue + S3 in our staging account:
- 5M-row × 108 byte streaming append (515 MiB uncompressed → 390 MiB on disk
in 24 files), 19 Hz RSS sampling — peak 236 MiB, mean 167 MiB, no growth from
start to finish (chart above).
- The same scripts that backed PR #3335
(`smoke_test_record_batch_reader.py`, `smoke_test_athena_readback.py`) pass
unchanged on this branch — Athena `COUNT(*)` and `MAX(id)` match input on a
streaming-written table.
# Are there any user-facing changes?
Effectively none beyond what #3335 already introduced — this PR changes
internals only:
- **`write.target-file-size-bytes` semantics tighten for streaming inputs.**
A user who set this property on a streaming-write workflow under #3335 was
getting files 3-10× smaller than configured (uncompressed Arrow bytes proxy).
With this PR the property now reflects actual on-disk compressed bytes — files
become correspondingly larger. This is a net win and matches the spec, but
worth noting for anyone who calibrated batch sizes around the old behaviour.
- **`bin_pack_record_batches` helper removed** from `pyiceberg.io.pyarrow`.
It was added in #3335 (so it's never been in a release) and its only consumer
was `_dataframe_to_data_files`'s streaming branch, which is now restructured.
The public `Table.append(reader)` / `Table.overwrite(reader)` API and its
docstring guarantees are unchanged.
# Related
- Builds on #3335 (must land first)
- Uses `OutputStream.tell()` from #2998 (already merged)
- Closes the byte-semantics caveat documented in #3335
- Reference implementation: iceberg-go#369 (single-writer streaming, same
model)
- Aligned with Java/Spark/Flink interpretation of
`write.target-file-size-bytes`
- Tracks https://github.com/apache/iceberg-python/issues/2152
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]