robreeves commented on code in PR #3046:
URL: https://github.com/apache/iceberg-python/pull/3046#discussion_r2823352133
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1710,6 +1685,76 @@ def _read_all_delete_files(io: FileIO, tasks:
Iterable[FileScanTask]) -> dict[st
return deletes_per_file
+_QUEUE_SENTINEL = object()
+
+
+def _bounded_concurrent_batches(
+ tasks: list[FileScanTask],
+ batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
+ concurrent_files: int,
+ max_buffered_batches: int = 16,
+) -> Generator[pa.RecordBatch, None, None]:
+ """Read batches from multiple files concurrently with bounded memory.
+
+ Uses a per-scan ThreadPoolExecutor(max_workers=concurrent_files) to
naturally
+ bound concurrency. Workers push batches into a bounded queue which provides
+ backpressure when the consumer is slower than the producers.
+
+ Args:
+ tasks: The file scan tasks to process.
+ batch_fn: A callable that takes a FileScanTask and returns an iterator
of RecordBatches.
+ concurrent_files: Maximum number of files to read concurrently.
+ max_buffered_batches: Maximum number of batches to buffer in the queue.
+ """
+ if not tasks:
+ return
+
+ batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] =
queue.Queue(maxsize=max_buffered_batches)
+ cancel = threading.Event()
+ remaining = len(tasks)
+ remaining_lock = threading.Lock()
+
+ def worker(task: FileScanTask) -> None:
+ nonlocal remaining
+ try:
+ for batch in batch_fn(task):
+ if cancel.is_set():
+ return
+ batch_queue.put(batch)
+ except BaseException as e:
+ if not cancel.is_set():
+ batch_queue.put(e)
+ finally:
+ with remaining_lock:
+ remaining -= 1
+ if remaining == 0:
+ batch_queue.put(_QUEUE_SENTINEL)
+
+ with ThreadPoolExecutor(max_workers=concurrent_files) as executor:
+ for task in tasks:
+ executor.submit(worker, task)
+
+ try:
+ while True:
+ item = batch_queue.get()
+
+ if item is _QUEUE_SENTINEL:
+ break
+
+ if isinstance(item, BaseException):
+ raise item
+
+ yield item
+ finally:
+ cancel.set()
+ # Drain the queue to unblock any workers stuck on put()
+ while not batch_queue.empty():
Review Comment:
This could cause a worker to hang when `concurrent_files > 1` and
`max_buffered_batches=1`. Here is an example.
**Starting state**
max_buffered_batches=1, concurrent_files=3. Queue is full (1 item). Workers
A, B, and C are all blocked on batch_queue.put().
**Timeline**
| Step | Main thread | Workers A, B, C |
|------|-------------|-----------------|
| 1 | `cancel.set()` | |
| 2 | `get_nowait()` → removes 1 item. Queue: 0. Internally notifies
Worker A | Worker A: woken but hasn't run yet |
| 3 | `empty()` → `True` (queue IS empty because A hasn't put yet).
**Exits drain loop.** | |
| 4 | `executor.__exit__()` → `shutdown(wait=True)`, joins all threads...
| Worker A runs, `put()` completes → Queue: 1. Checks cancel → returns. ✓ |
| 5 | **DEADLOCK** — waiting for B and C to finish | Workers B, C: still
blocked on `put()`. Queue is full, nobody will ever drain. |
**Fix**
In the worker use put with a timeout so it can check if the thread is
canceled periodically.
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1789,54 +1834,114 @@ def to_table(self, tasks: Iterable[FileScanTask]) ->
pa.Table:
return result
- def to_record_batches(self, tasks: Iterable[FileScanTask]) ->
Iterator[pa.RecordBatch]:
+ def to_record_batches(
+ self,
+ tasks: Iterable[FileScanTask],
+ batch_size: int | None = None,
+ order: ScanOrder = ScanOrder.TASK,
+ concurrent_files: int = 1,
+ ) -> Iterator[pa.RecordBatch]:
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
by resolving the right columns that match the current table schema.
Only data that matches the provided row_filter expression is returned.
+ Ordering semantics:
+ - ScanOrder.TASK (default): Batches are grouped by file in task
submission order.
+ - ScanOrder.ARRIVAL: Batches may be interleaved across files.
Within each file,
+ batch ordering follows row order.
+
Args:
tasks: FileScanTasks representing the data files and delete files
to read from.
+ batch_size: The number of rows per batch. If None, PyArrow's
default is used.
+ order: Controls the order in which record batches are returned.
+ ScanOrder.TASK (default) returns batches in task order, with
each task
+ fully materialized before proceeding to the next. Allows
parallel file
+ reads via executor. ScanOrder.ARRIVAL yields batches as they
are
+ produced without materializing entire files into memory.
+ concurrent_files: Number of files to read concurrently when
order=ScanOrder.ARRIVAL.
+ Must be >= 1. When > 1, batches may arrive interleaved across
files.
+ Ignored when order=ScanOrder.TASK.
Returns:
An Iterator of PyArrow RecordBatches.
Total number of rows will be capped if specified.
Raises:
ResolveError: When a required field cannot be found in the file
- ValueError: When a field type in the file cannot be projected to
the schema type
+ ValueError: When a field type in the file cannot be projected to
the schema type,
+ or when an invalid order value is provided, or when
concurrent_files < 1.
"""
- deletes_per_file = _read_all_delete_files(self._io, tasks)
+ if not isinstance(order, ScanOrder):
+ raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder
enum value (ScanOrder.TASK or ScanOrder.ARRIVAL).")
- total_row_count = 0
+ if concurrent_files < 1:
+ raise ValueError(f"concurrent_files must be >= 1, got
{concurrent_files}")
+
+ task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)
+
+ if order == ScanOrder.ARRIVAL:
Review Comment:
If ARRIVAL is not used should it through an exception if `concurrent_files`
is set? It feels confusing to have it in the method params when it is not
always used. Could it be inside a ScanOrder class, where each ScanOrder has a
subclass with params specific to it?
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1710,6 +1685,76 @@ def _read_all_delete_files(io: FileIO, tasks:
Iterable[FileScanTask]) -> dict[st
return deletes_per_file
+_QUEUE_SENTINEL = object()
+
+
+def _bounded_concurrent_batches(
+ tasks: list[FileScanTask],
+ batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
+ concurrent_files: int,
+ max_buffered_batches: int = 16,
Review Comment:
I don't think this should be hardcoded. This means the only control a user
has over the memory is lowering the batch size or making files with less than
16 batches and setting `concurrent_files`. It feels like they have to be too in
the weeds to tune this.
Instead, WDYT you think of consolidating `max_buffered_batches` and
`concurrent_files`? Instead have a single `parallelism` config. It would be
used for the max worker count and number of batches. In the end the caller
still gets the same number of parallel batches loaded.
One downside is it could result in a large number of files open if they want
high parallelism. So a simpler alternative could be to expose
`max_buffered_batches` so a user can set it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]