kevinjqliu commented on PR #3046:
URL: https://github.com/apache/iceberg-python/pull/3046#issuecomment-4006737089

   I took some time to understand how ArrowScan is implemented and used today; 
and also where the potential bottlenecks are. Here's a summary:
   
   ArrowScan has 2 public functions for reading; `to_table` and 
`to_record_batches`. 
   - `to_table` materializes the entire table as an arrow table
   - `to_record_batches` return an iterator with the expectation of lazy 
materialization
   
   Let's focus on `to_record_batches` since `to_table` can just be implemented 
by consuming all the record batches. 
   We're looking at this problem both in terms of throughput (how much data can 
be read) and memory usage (how much data is buffered in memory).
   
   ArrowScan can utilize multiple threads, via the `ExecutorFactory`, to spread 
out tasks to worker threads. 
   I realized a few things while reviewing the original code:
   1. There's an implicit ordering of the record batches; the ordering is 
deterministic based on task order (task 0, task 1, ...). Batches from later 
tasks are not emitted before earlier tasks complete.
   2. Because of the strict ordering and with multiple worker threads, there's 
inherent head-of-line blocking (as mentioned above). Later tasks may finish 
work, but their results cannot be emitted until earlier tasks complete. This 
blocking diminishes the overall throughput of ArrowScan's ability to produce 
record batches.  
   3. Each worker materializes the entire task/file to memory (via  
`list(self._record_batches_from_scan_tasks_and_deletes)`) before returning 
results. This causes higher than expected memory utilization and imo is a bug 
because it's not the expected behavior for `to_record_batches`'s lazy 
materialization. 
   
   To summarize, the current implementation fan out tasks to different workers. 
Each worker will read/materialize the entire task/file in memory before 
returning record batches. But only the first worker's record batches are 
consumed while the other workers wait.
   
   The problem I would really like to solve immediately is (3), which 
drastically increases memory utilization (by `# of workers` * `size of file in 
memory`). I think the right behavior for `to_record_batches` is for each worker 
to not materialize the entire task/file in memory. 
   
   I think the bounded queue is a great idea for maximizing throughput while 
bounding memory usage. But it would only be useful when we relax the constraint 
for ordering. It would be great to brainstorm ways to introduce this change 
into the codebase while ensuring that it's maintainable. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to