DieHertz opened a new issue, #1229: URL: https://github.com/apache/iceberg-python/issues/1229
### Feature Request / Improvement Creating from https://github.com/apache/iceberg-python/pull/614#issuecomment-2375186118 as suggested. I have a couple of moderately heavy tables with millions of parquet files and ~100 manifest files, which I routinely need to query using the `.files` metatable. The existing implementation goes over the manifests sequentially and takes quite a while constructing the resulting `pyarrow.Table`. Since reporting the issue in #614 I have spent a while thinking about it and came up with a temporary solution that works in my case. I'm using a `ProcessPoolExecutor` and distributing the manifests each to their own mapper, returning `pyarrow.RecordBatch` from each. Then I'm using `pyarrow.Table.from_batches()` to construct the resulting table. Ideally I would like to process AVRO manifests on per-block basis to further speed things up, but doing so with the `ProcessPoolExecutor` seems to come with too much overhead. Before writing my custom code, `.files()` on a table with 350k files took ~70 seconds, and around ~200 seconds on a 1m file table. The code below, which is not an apples to apples comparison because I only process the data I need, takes less than 5 seconds. Here's what I did in my particular case to give the general idea: ``` _FILES_SCHEMA = pa.schema( [ pa.field('path', pa.string(), nullable=False), pa.field('rover', pa.string(), nullable=False), pa.field('publish_ts_wall_min', pa.int64(), nullable=False), pa.field('publish_ts_wall_max', pa.int64(), nullable=False), ] ) def get_message_table_files( table: pyiceberg.table.Table, ) -> pa.Table: schema = table.metadata.schema() snapshot = table.current_snapshot() if not snapshot: return pa.Table.from_pylist( [], schema=_FILES_SCHEMA, ) with ProcessPoolExecutor() as pool: return pa.Table.from_batches( pool.map( partial(_process_manifest, schema, table.io), snapshot.manifests(table.io), ), schema=_FILES_SCHEMA, ) def _process_manifest( table_schema: Schema, io: FileIO, manifest: ManifestFile, ) -> pa.RecordBatch: publish_ts_wall_field = table_schema.find_field('publish_ts_wall') rows = [ dict( path=entry.data_file.file_path, rover=entry.data_file.partition.rover, publish_ts_wall_min=from_bytes( publish_ts_wall_field.field_type, entry.data_file.lower_bounds.get(publish_ts_wall_field.field_id), ), publish_ts_wall_max=from_bytes( publish_ts_wall_field.field_type, entry.data_file.upper_bounds.get(publish_ts_wall_field.field_id), ), ) for entry in manifest.fetch_manifest_entry(io) if entry.data_file.file_format == FileFormat.PARQUET and entry.status != ManifestEntryStatus.DELETED ] return pa.RecordBatch.from_pylist( rows, schema=_FILES_SCHEMA, ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org