kevinjqliu commented on code in PR #1621: URL: https://github.com/apache/iceberg-python/pull/1621#discussion_r1948157569
########## pyiceberg/io/pyarrow.py: ########## @@ -1359,14 +1359,11 @@ def _task_to_record_batches( # Apply the user filter if pyarrow_filter is not None: - # we need to switch back and forth between RecordBatch and Table - # as Expression filter isn't yet supported in RecordBatch - # https://github.com/apache/arrow/issues/39220 - arrow_table = pa.Table.from_batches([batch]) - arrow_table = arrow_table.filter(pyarrow_filter) - if len(arrow_table) == 0: + if batch.num_rows == 0: continue - output_batches = arrow_table.to_batches() + filtered_batch = batch.filter(pyarrow_filter) + if filtered_batch.num_rows > 0: + output_batches = [filtered_batch] Review Comment: i think this is a bug. if `filtered_batch==0`, `output_batches` will be the value in L1358, which is not right we probably want another ``` if filtered_batch.num_rows == 0: continue ``` ########## pyiceberg/io/pyarrow.py: ########## @@ -1359,14 +1359,11 @@ def _task_to_record_batches( # Apply the user filter if pyarrow_filter is not None: - # we need to switch back and forth between RecordBatch and Table - # as Expression filter isn't yet supported in RecordBatch - # https://github.com/apache/arrow/issues/39220 - arrow_table = pa.Table.from_batches([batch]) - arrow_table = arrow_table.filter(pyarrow_filter) - if len(arrow_table) == 0: + if batch.num_rows == 0: continue - output_batches = arrow_table.to_batches() + filtered_batch = batch.filter(pyarrow_filter) + if filtered_batch.num_rows > 0: + output_batches = [filtered_batch] Review Comment: the use of `output_batches` here is kind of confusing. WDYT of ``` batches = fragment_scanner.to_batches() for batch in batches: next_index = next_index + len(batch) current_index = next_index - len(batch) # Start with original batch current_batch = batch # Apply positional deletes if needed if positional_deletes: # Create the mask of indices that we're interested in indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) current_batch = current_batch.take(indices) # Skip empty batches if current_batch.num_rows == 0: continue # Apply user filter if needed if pyarrow_filter is not None: current_batch = current_batch.filter(pyarrow_filter) # Skip empty batches if current_batch.num_rows == 0: continue # Yield the final processed batch yield _to_requested_schema( projected_schema, file_project_schema, current_batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types, ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org