Fokko commented on code in PR #786:
URL: https://github.com/apache/iceberg-python/pull/786#discussion_r1624861038


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1005,36 +1004,42 @@ def _task_to_table(
             columns=[col.name for col in file_project_schema.columns],
         )
 
-        if positional_deletes:
-            # Create the mask of indices that we're interested in
-            indices = _combine_positional_deletes(positional_deletes, 
fragment.count_rows())
-
-            if limit:
-                if pyarrow_filter is not None:
-                    # In case of the filter, we don't exactly know how many 
rows
-                    # we need to fetch upfront, can be optimized in the future:
-                    # https://github.com/apache/arrow/issues/35301
-                    arrow_table = fragment_scanner.take(indices)
-                    arrow_table = arrow_table.filter(pyarrow_filter)
-                    arrow_table = arrow_table.slice(0, limit)
-                else:
-                    arrow_table = fragment_scanner.take(indices[0:limit])
-            else:
-                arrow_table = fragment_scanner.take(indices)
+        current_index = 0
+        batches = fragment_scanner.to_batches()
+        for batch in batches:
+            if positional_deletes:
+                # Create the mask of indices that we're interested in
+                indices = _combine_positional_deletes(positional_deletes, 
current_index, current_index + len(batch))
+                print(f"DEBUG: {indices=} {current_index=} {len(batch)=}")
+                print(f"{batch=}")
+                batch = batch.take(indices)
+                print(f"{batch=}")
                 # Apply the user filter
                 if pyarrow_filter is not None:
+                    # we need to switch back and forth between RecordBatch and 
Table
+                    # as Expression filter isn't yet supported in RecordBatch
+                    # https://github.com/apache/arrow/issues/39220
+                    arrow_table = pa.Table.from_batches([batch])
                     arrow_table = arrow_table.filter(pyarrow_filter)
-        else:
-            # If there are no deletes, we can just take the head
-            # and the user-filter is already applied
-            if limit:
-                arrow_table = fragment_scanner.head(limit)
-            else:
-                arrow_table = fragment_scanner.to_table()
+                    batch = arrow_table.to_batches()[0]
+            yield to_requested_schema(projected_schema, file_project_schema, 
batch)
+            current_index += len(batch)
 
-        if len(arrow_table) < 1:
-            return None
-        return to_requested_schema(projected_schema, file_project_schema, 
arrow_table)
+
+def _task_to_table(
+    fs: FileSystem,
+    task: FileScanTask,
+    bound_row_filter: BooleanExpression,
+    projected_schema: Schema,
+    projected_field_ids: Set[int],
+    positional_deletes: Optional[List[ChunkedArray]],
+    case_sensitive: bool,
+    name_mapping: Optional[NameMapping] = None,
+) -> pa.Table:
+    batches = _task_to_record_batches(
+        fs, task, bound_row_filter, projected_schema, projected_field_ids, 
positional_deletes, case_sensitive, name_mapping
+    )
+    return pa.Table.from_batches(batches, 
schema=schema_to_pyarrow(projected_schema))

Review Comment:
   This was exactly what I had in mind, looking good 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to