MarkusSintonen opened a new issue, #45352: URL: https://github.com/apache/arrow/issues/45352
### Describe the usage question you have. Please include as many useful details as possible. There doesn't seem to be anyway in IO interfaces to use async code to feed data into the parquet parsers. However there seems to be a hacky workaround which seems to work via using anonymous mmap and feeding the parser via that: ```python class MyAsyncReader(Protocol): async def parquet_size(self) -> int: ... # Size stored separately when writing elsewhere async def parquet_meta(self) -> bytes: ... # Metadata stored separately when writing elsewhere async def parquet_data(self, start_offset: int, end_offset: int) -> bytes: ... async def query(reader: MyAsyncReader, filter: Expression) -> Table: size = await reader.parquet_size() meta = await reader.parquet_meta() anon_mmap = mmap.mmap(-1, size) anon_mmap.seek(size - len(meta)) # Meta to tail anon_mmap.write(meta) frag = ParquetFileFormat().make_fragment(anon_mmap).subset(filter) first_row_col = frag.row_groups[0].metadata.column(0) last_row_col = frag.row_groups[-1].metadata.column(frag.metadata.num_columns - 1) start_offset = offset(first_row_col) end_offset = offset(last_row_col) + last_row_col.total_compressed_size anon_mmap.seek(start_offset) anon_mmap.write(await reader.parquet_data(start_offset, end_offset)) # Feed needed data for parser return frag.to_table() # Parse the subset of row groups def offset(meta: ColumnChunkMetaData) -> int: return ( min(meta.dictionary_page_offset, meta.data_page_offset) # Is there a better way to get this? if meta.dictionary_page_offset is not None else meta.data_page_offset ) ``` Is there any other way to feed data into the file parser externally? Using the anonymous mmap feels hacky to feed data into the parser. There are the [IO interfaces](https://arrow.apache.org/docs/python/memory.html#input-and-output) but none of these are suitable for async code. Also is there a better way to get the file offsets based on the filter-expression other than above? We can not rely on `ThreadPoolExecutor` (or `ProcessPoolExecutor`) for doing the blocking IO. We can not consume threads as the processing is heavily IO bound with very high level of concurrency. Where most of the work goes into waiting for the IO. So we can not consume threads to just wait for the IO. With the pure async IO code it is able to handle much higher level of concurrency as the work is not bound by the parquet-parsing. ### Component(s) Python -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org