kevinjqliu commented on PR #3046: URL: https://github.com/apache/iceberg-python/pull/3046#issuecomment-4006737089
I took some time to understand how ArrowScan is implemented and used today; and also where the potential bottlenecks are. Here's a summary: ArrowScan has 2 public functions for reading; `to_table` and `to_record_batches`. - `to_table` materializes the entire table as an arrow table - `to_record_batches` return an iterator with the expectation of lazy materialization Let's focus on `to_record_batches` since `to_table` can just be implemented by consuming all the record batches. We're looking at this problem both in terms of throughput (how much data can be read) and memory usage (how much data is buffered in memory). ArrowScan can utilize multiple threads, via the `ExecutorFactory`, to spread out tasks to worker threads. I realized a few things while reviewing the original code: 1. There's an implicit ordering of the record batches; the ordering is deterministic based on task order (task 0, task 1, ...). Batches from later tasks are not emitted before earlier tasks complete. 2. Because of the strict ordering and with multiple worker threads, there's inherent head-of-line blocking (as mentioned above). Later tasks may finish work, but their results cannot be emitted until earlier tasks complete. This blocking diminishes the overall throughput of ArrowScan's ability to produce record batches. 3. Each worker materializes the entire task/file to memory (via `list(self._record_batches_from_scan_tasks_and_deletes)`) before returning results. This causes higher than expected memory utilization and imo is a bug because it's not the expected behavior for `to_record_batches`'s lazy materialization. To summarize, the current implementation fan out tasks to different workers. Each worker will read/materialize the entire task/file in memory before returning record batches. But only the first worker's record batches are consumed while the other workers wait. The problem I would really like to solve immediately is (3), which drastically increases memory utilization (by `# of workers` * `size of file in memory`). I think the right behavior for `to_record_batches` is for each worker to not materialize the entire task/file in memory. I think the bounded queue is a great idea for maximizing throughput while bounding memory usage. But it would only be useful when we relax the constraint for ordering. It would be great to brainstorm ways to introduce this change into the codebase while ensuring that it's maintainable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
