MarkusSintonen opened a new issue, #45352:
URL: https://github.com/apache/arrow/issues/45352

   ### Describe the usage question you have. Please include as many useful 
details as  possible.
   
   
   There doesn't seem to be anyway in IO interfaces to use async code to feed 
data into the parquet parsers. However there seems to be a hacky workaround 
which seems to work via using anonymous mmap and feeding the parser via that:
   ```python
   class MyAsyncReader(Protocol):
       async def parquet_size(self) -> int: ...  # Size stored separately when 
writing elsewhere
       async def parquet_meta(self) -> bytes: ...  # Metadata stored separately 
when writing elsewhere
       async def parquet_data(self, start_offset: int, end_offset: int) -> 
bytes: ...
   
   
   async def query(reader: MyAsyncReader, filter: Expression) -> Table:
       size = await reader.parquet_size()
       meta = await reader.parquet_meta()
   
       anon_mmap = mmap.mmap(-1, size)
       anon_mmap.seek(size - len(meta))  # Meta to tail
       anon_mmap.write(meta)
   
       frag = ParquetFileFormat().make_fragment(anon_mmap).subset(filter)
   
       first_row_col = frag.row_groups[0].metadata.column(0)
       last_row_col = 
frag.row_groups[-1].metadata.column(frag.metadata.num_columns - 1)
       start_offset = offset(first_row_col)
       end_offset = offset(last_row_col) + last_row_col.total_compressed_size
   
       anon_mmap.seek(start_offset)
       anon_mmap.write(await reader.parquet_data(start_offset, end_offset))  # 
Feed needed data for parser
       
       return frag.to_table()  # Parse the subset of row groups
   
   
   def offset(meta: ColumnChunkMetaData) -> int:
       return (
           min(meta.dictionary_page_offset, meta.data_page_offset)  # Is there 
a better way to get this?
           if meta.dictionary_page_offset is not None
           else meta.data_page_offset
       )
   ```
   
   Is there any other way to feed data into the file parser externally? Using 
the anonymous mmap feels hacky to feed data into the parser. There are the [IO 
interfaces](https://arrow.apache.org/docs/python/memory.html#input-and-output) 
but none of these are suitable for async code. Also is there a better way to 
get the file offsets based on the filter-expression other than above? 
   
   We can not rely on `ThreadPoolExecutor` (or `ProcessPoolExecutor`) for doing 
the blocking IO. We can not consume threads as the processing is heavily IO 
bound with very high level of concurrency. Where most of the work goes into 
waiting for the IO. So we can not consume threads to just wait for the IO. With 
the pure async IO code it is able to handle much higher level of concurrency as 
the work is not bound by the parquet-parsing.
   
   ### Component(s)
   
   Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to