Fokko commented on code in PR #786: URL: https://github.com/apache/iceberg-python/pull/786#discussion_r1624861038
########## pyiceberg/io/pyarrow.py: ########## @@ -1005,36 +1004,42 @@ def _task_to_table( columns=[col.name for col in file_project_schema.columns], ) - if positional_deletes: - # Create the mask of indices that we're interested in - indices = _combine_positional_deletes(positional_deletes, fragment.count_rows()) - - if limit: - if pyarrow_filter is not None: - # In case of the filter, we don't exactly know how many rows - # we need to fetch upfront, can be optimized in the future: - # https://github.com/apache/arrow/issues/35301 - arrow_table = fragment_scanner.take(indices) - arrow_table = arrow_table.filter(pyarrow_filter) - arrow_table = arrow_table.slice(0, limit) - else: - arrow_table = fragment_scanner.take(indices[0:limit]) - else: - arrow_table = fragment_scanner.take(indices) + current_index = 0 + batches = fragment_scanner.to_batches() + for batch in batches: + if positional_deletes: + # Create the mask of indices that we're interested in + indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) + print(f"DEBUG: {indices=} {current_index=} {len(batch)=}") + print(f"{batch=}") + batch = batch.take(indices) + print(f"{batch=}") # Apply the user filter if pyarrow_filter is not None: + # we need to switch back and forth between RecordBatch and Table + # as Expression filter isn't yet supported in RecordBatch + # https://github.com/apache/arrow/issues/39220 + arrow_table = pa.Table.from_batches([batch]) arrow_table = arrow_table.filter(pyarrow_filter) - else: - # If there are no deletes, we can just take the head - # and the user-filter is already applied - if limit: - arrow_table = fragment_scanner.head(limit) - else: - arrow_table = fragment_scanner.to_table() + batch = arrow_table.to_batches()[0] + yield to_requested_schema(projected_schema, file_project_schema, batch) + current_index += len(batch) - if len(arrow_table) < 1: - return None - return to_requested_schema(projected_schema, file_project_schema, arrow_table) + +def _task_to_table( + fs: FileSystem, + task: FileScanTask, + bound_row_filter: BooleanExpression, + projected_schema: Schema, + projected_field_ids: Set[int], + positional_deletes: Optional[List[ChunkedArray]], + case_sensitive: bool, + name_mapping: Optional[NameMapping] = None, +) -> pa.Table: + batches = _task_to_record_batches( + fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping + ) + return pa.Table.from_batches(batches, schema=schema_to_pyarrow(projected_schema)) Review Comment: This was exactly what I had in mind, looking good 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org