rdblue commented on code in PR #6775: URL: https://github.com/apache/iceberg/pull/6775#discussion_r1111320361
########## python/pyiceberg/io/pyarrow.py: ########## @@ -546,11 +568,45 @@ def project_table( id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) }.union(extract_field_ids(bound_row_filter)) + tasks_data_files: List[FileScanTask] = [] + tasks_positional_deletes: List[FileScanTask] = [] + for task in tasks: + if task.file.content == DataFileContent.DATA: + tasks_data_files.append(task) + elif task.file.content == DataFileContent.POSITION_DELETES: + tasks_positional_deletes.append(task) + elif task.file.content == DataFileContent.EQUALITY_DELETES: + raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") + else: + raise ValueError(f"Unknown file content: {task.file.content}") + with ThreadPool() as pool: + positional_deletes_per_file: Dict[str, List[pa.ChunkedArray]] = {} + if tasks_positional_deletes: + # If there are any positional deletes, get those first + for delete_files in pool.starmap( + func=_read_deletes, + iterable=[(fs, task.file.file_path) for task in tasks_positional_deletes], + ): + for file, buffer in delete_files.items(): + positional_deletes_per_file[file] = positional_deletes_per_file.get(file, []) + [buffer] Review Comment: I think this needs to take sequence number into account. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org