MarkusSintonen opened a new issue, #45352:
URL: https://github.com/apache/arrow/issues/45352
### Describe the usage question you have. Please include as many useful
details as possible.
There doesn't seem to be anyway in IO interfaces to use async code to feed
data into the parquet parsers. However there seems to be a hacky workaround
which seems to work via using anonymous mmap and feeding the parser via that:
```python
class MyAsyncReader(Protocol):
async def parquet_size(self) -> int: ... # Size stored separately when
writing elsewhere
async def parquet_meta(self) -> bytes: ... # Metadata stored separately
when writing elsewhere
async def parquet_data(self, start_offset: int, end_offset: int) ->
bytes: ...
async def query(reader: MyAsyncReader, filter: Expression) -> Table:
size = await reader.parquet_size()
meta = await reader.parquet_meta()
anon_mmap = mmap.mmap(-1, size)
anon_mmap.seek(size - len(meta)) # Meta to tail
anon_mmap.write(meta)
frag = ParquetFileFormat().make_fragment(anon_mmap).subset(filter)
first_row_col = frag.row_groups[0].metadata.column(0)
last_row_col =
frag.row_groups[-1].metadata.column(frag.metadata.num_columns - 1)
start_offset = offset(first_row_col)
end_offset = offset(last_row_col) + last_row_col.total_compressed_size
anon_mmap.seek(start_offset)
anon_mmap.write(await reader.parquet_data(start_offset, end_offset)) #
Feed needed data for parser
return frag.to_table() # Parse the subset of row groups
def offset(meta: ColumnChunkMetaData) -> int:
return (
min(meta.dictionary_page_offset, meta.data_page_offset) # Is there
a better way to get this?
if meta.dictionary_page_offset is not None
else meta.data_page_offset
)
```
Is there any other way to feed data into the file parser externally? Using
the anonymous mmap feels hacky to feed data into the parser. There are the [IO
interfaces](https://arrow.apache.org/docs/python/memory.html#input-and-output)
but none of these are suitable for async code. Also is there a better way to
get the file offsets based on the filter-expression other than above?
We can not rely on `ThreadPoolExecutor` (or `ProcessPoolExecutor`) for doing
the blocking IO. We can not consume threads as the processing is heavily IO
bound with very high level of concurrency. Where most of the work goes into
waiting for the IO. So we can not consume threads to just wait for the IO. With
the pure async IO code it is able to handle much higher level of concurrency as
the work is not bound by the parquet-parsing.
### Component(s)
Python
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]