Fokko commented on code in PR #1995:
URL: https://github.com/apache/iceberg-python/pull/1995#discussion_r2159047608


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1570,47 +1567,20 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> 
pa.Table:
             ResolveError: When a required field cannot be found in the file
             ValueError: When a field type in the file cannot be projected to 
the schema type
         """
-        deletes_per_file = _read_all_delete_files(self._io, tasks)
-        executor = ExecutorFactory.get_or_create()
-
-        def _table_from_scan_task(task: FileScanTask) -> pa.Table:
-            batches = 
list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
-            if len(batches) > 0:
-                return pa.Table.from_batches(batches)
-            else:
-                return None
-
-        futures = [
-            executor.submit(
-                _table_from_scan_task,
-                task,
-            )
-            for task in tasks
-        ]
-        total_row_count = 0
-        # for consistent ordering, we need to maintain future order
-        futures_index = {f: i for i, f in enumerate(futures)}
-        completed_futures: SortedList[Future[pa.Table]] = 
SortedList(iterable=[], key=lambda f: futures_index[f])
-        for future in concurrent.futures.as_completed(futures):
-            completed_futures.add(future)
-            if table_result := future.result():
-                total_row_count += len(table_result)
-            # stop early if limit is satisfied
-            if self._limit is not None and total_row_count >= self._limit:
-                break
-
-        # by now, we've either completed all tasks or satisfied the limit
-        if self._limit is not None:
-            _ = [f.cancel() for f in futures if not f.done()]
-
-        tables = [f.result() for f in completed_futures if f.result()]
-
         arrow_schema = schema_to_pyarrow(self._projected_schema, 
include_field_ids=False)
 
-        if len(tables) < 1:
+        batches = self.to_record_batches(tasks)
+        try:
+            first_batch = next(batches)
+        except StopIteration:
+            # Empty
             return pa.Table.from_batches([], schema=arrow_schema)

Review Comment:
   ```suggestion
               # Empty
               return arrow_schema.empty_table()
   ```



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1644,7 +1611,32 @@ def to_record_batches(self, tasks: 
Iterable[FileScanTask]) -> Iterator[pa.Record
             ValueError: When a field type in the file cannot be projected to 
the schema type
         """
         deletes_per_file = _read_all_delete_files(self._io, tasks)
-        return self._record_batches_from_scan_tasks_and_deletes(tasks, 
deletes_per_file)
+
+        total_row_count = 0
+        executor = ExecutorFactory.get_or_create()
+
+        def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]:
+            # Materialize the iterator here to ensure execution happens within 
the executor.
+            # Otherwise, the iterator would be lazily consumed later (in the 
main thread),
+            # defeating the purpose of using executor.map.
+            return 
list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
+
+        limit_reached = False
+        for batches in executor.map(batches_for_task, tasks):
+            for batch in batches:
+                current_batch_size = len(batch)
+                if self._limit is not None and total_row_count + 
current_batch_size >= self._limit:
+                    yield batch.slice(0, self._limit - total_row_count)
+
+                    limit_reached = True
+                    break
+
+                yield batch
+                total_row_count += current_batch_size

Review Comment:
   Nit, I know that it is the same in logic, but this makes it easier to read 
(for me at least, so feel free to ignore)
   ```suggestion
                   else:
                       yield batch
                       total_row_count += current_batch_size
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to