kevinjqliu commented on code in PR #1621:
URL: https://github.com/apache/iceberg-python/pull/1621#discussion_r1948240540


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1359,14 +1359,11 @@ def _task_to_record_batches(
 
                 # Apply the user filter
                 if pyarrow_filter is not None:
-                    # we need to switch back and forth between RecordBatch and 
Table
-                    # as Expression filter isn't yet supported in RecordBatch
-                    # https://github.com/apache/arrow/issues/39220
-                    arrow_table = pa.Table.from_batches([batch])
-                    arrow_table = arrow_table.filter(pyarrow_filter)
-                    if len(arrow_table) == 0:
+                    if batch.num_rows == 0:
                         continue
-                    output_batches = arrow_table.to_batches()
+                    filtered_batch = batch.filter(pyarrow_filter)
+                    if filtered_batch.num_rows > 0:
+                        output_batches = [filtered_batch]

Review Comment:
   > so if there are no positional deletes then it will hit directly the if 
pyarrow_filter is not None line without checking if the batch is empty.
   
   right, i assume this will not generate empty batches
   ```
               next_index = next_index + len(batch)
               current_index = next_index - len(batch)
               # Start with original batch
               current_batch = batch
   ```
   
   we can also move 
   ```
                   # Skip empty batches
                   if current_batch.num_rows == 0:
                       continue
   
   ```
   outside the `if positional_deletes` branch, to catch both branches. 
   
   ```
   the use of output_batches here is kind of confusing.
   
   WDYT of
   
           batches = fragment_scanner.to_batches()
           for batch in batches:
               next_index = next_index + len(batch)
               current_index = next_index - len(batch)
               # Start with original batch
               current_batch = batch
               
               # Apply positional deletes if needed
               if positional_deletes:
                   # Create the mask of indices that we're interested in
                   indices = _combine_positional_deletes(positional_deletes, 
current_index, current_index + len(batch))
                   current_batch = current_batch.take(indices)
               
               # Skip empty batches
               if current_batch.num_rows == 0:
                   continue
               
               # Apply user filter if needed
               if pyarrow_filter is not None:
                   current_batch = current_batch.filter(pyarrow_filter)
                   # Skip empty batches
                   if current_batch.num_rows == 0:
                       continue
               
               # Yield the final processed batch
               yield _to_requested_schema(
                   projected_schema,
                   file_project_schema,
                   current_batch,
                   downcast_ns_timestamp_to_us=True,
                   use_large_types=use_large_types,
               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to