Fokko commented on code in PR #1995: URL: https://github.com/apache/iceberg-python/pull/1995#discussion_r2159047608
########## pyiceberg/io/pyarrow.py: ########## @@ -1570,47 +1567,20 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ - deletes_per_file = _read_all_delete_files(self._io, tasks) - executor = ExecutorFactory.get_or_create() - - def _table_from_scan_task(task: FileScanTask) -> pa.Table: - batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) - if len(batches) > 0: - return pa.Table.from_batches(batches) - else: - return None - - futures = [ - executor.submit( - _table_from_scan_task, - task, - ) - for task in tasks - ] - total_row_count = 0 - # for consistent ordering, we need to maintain future order - futures_index = {f: i for i, f in enumerate(futures)} - completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f]) - for future in concurrent.futures.as_completed(futures): - completed_futures.add(future) - if table_result := future.result(): - total_row_count += len(table_result) - # stop early if limit is satisfied - if self._limit is not None and total_row_count >= self._limit: - break - - # by now, we've either completed all tasks or satisfied the limit - if self._limit is not None: - _ = [f.cancel() for f in futures if not f.done()] - - tables = [f.result() for f in completed_futures if f.result()] - arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False) - if len(tables) < 1: + batches = self.to_record_batches(tasks) + try: + first_batch = next(batches) + except StopIteration: + # Empty return pa.Table.from_batches([], schema=arrow_schema) Review Comment: ```suggestion # Empty return arrow_schema.empty_table() ``` ########## pyiceberg/io/pyarrow.py: ########## @@ -1644,7 +1611,32 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ValueError: When a field type in the file cannot be projected to the schema type """ deletes_per_file = _read_all_delete_files(self._io, tasks) - return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) + + total_row_count = 0 + executor = ExecutorFactory.get_or_create() + + def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]: + # Materialize the iterator here to ensure execution happens within the executor. + # Otherwise, the iterator would be lazily consumed later (in the main thread), + # defeating the purpose of using executor.map. + return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) + + limit_reached = False + for batches in executor.map(batches_for_task, tasks): + for batch in batches: + current_batch_size = len(batch) + if self._limit is not None and total_row_count + current_batch_size >= self._limit: + yield batch.slice(0, self._limit - total_row_count) + + limit_reached = True + break + + yield batch + total_row_count += current_batch_size Review Comment: Nit, I know that it is the same in logic, but this makes it easier to read (for me at least, so feel free to ignore) ```suggestion else: yield batch total_row_count += current_batch_size ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org