sumedhsakdeo commented on code in PR #3046:
URL: https://github.com/apache/iceberg-python/pull/3046#discussion_r2825463403
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1789,54 +1834,109 @@ def to_table(self, tasks: Iterable[FileScanTask]) ->
pa.Table:
return result
- def to_record_batches(self, tasks: Iterable[FileScanTask]) ->
Iterator[pa.RecordBatch]:
+ def to_record_batches(
+ self,
+ tasks: Iterable[FileScanTask],
+ batch_size: int | None = None,
+ order: ScanOrder = TaskOrder(),
+ ) -> Iterator[pa.RecordBatch]:
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
by resolving the right columns that match the current table schema.
Only data that matches the provided row_filter expression is returned.
+ Ordering semantics:
+ - TaskOrder() (default): Yields batches one file at a time in task
submission order.
+ - ArrivalOrder(): Batches may be interleaved across files as they
arrive.
+ Within each file, batch ordering follows row order.
+
Args:
tasks: FileScanTasks representing the data files and delete files
to read from.
+ batch_size: The number of rows per batch. If None, PyArrow's
default is used.
+ order: Controls the order in which record batches are returned.
+ TaskOrder() (default) yields batches one file at a time in
task order.
+ ArrivalOrder(concurrent_streams=N, max_buffered_batches=M)
yields batches
+ as they are produced without materializing entire files into
memory.
Returns:
An Iterator of PyArrow RecordBatches.
Total number of rows will be capped if specified.
Raises:
ResolveError: When a required field cannot be found in the file
- ValueError: When a field type in the file cannot be projected to
the schema type
+ ValueError: When a field type in the file cannot be projected to
the schema type,
+ or when an invalid order value is provided, or when
concurrent_streams < 1.
"""
- deletes_per_file = _read_all_delete_files(self._io, tasks)
+ if not isinstance(order, ScanOrder):
+ raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder
instance (TaskOrder() or ArrivalOrder()).")
- total_row_count = 0
+ task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)
+
+ if isinstance(order, ArrivalOrder):
+ if order.concurrent_streams < 1:
+ raise ValueError(f"concurrent_streams must be >= 1, got
{order.concurrent_streams}")
+ return self._apply_limit(self._iter_batches_arrival(task_list,
deletes_per_file, batch_size, order.concurrent_streams,
order.max_buffered_batches))
+
+ return self._apply_limit(self._iter_batches_materialized(task_list,
deletes_per_file, batch_size))
+
+ def _prepare_tasks_and_deletes(
+ self, tasks: Iterable[FileScanTask]
+ ) -> tuple[list[FileScanTask], dict[str, list[ChunkedArray]]]:
+ """Resolve delete files and return tasks as a list."""
+ task_list = list(tasks)
+ deletes_per_file = _read_all_delete_files(self._io, task_list)
+ return task_list, deletes_per_file
+
+ def _iter_batches_arrival(
+ self,
+ task_list: list[FileScanTask],
+ deletes_per_file: dict[str, list[ChunkedArray]],
+ batch_size: int | None,
+ concurrent_streams: int,
+ max_buffered_batches: int = 16,
Review Comment:
yep, refactoring this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]