sdd commented on code in PR #806: URL: https://github.com/apache/iceberg-rust/pull/806#discussion_r1894298314
########## crates/iceberg/src/arrow/reader.rs: ########## @@ -130,62 +128,41 @@ pub struct ArrowReader { impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. /// Returns a stream of Arrow RecordBatches containing the data from the files - pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> { + pub async fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> { let file_io = self.file_io.clone(); let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; - let (tx, rx) = channel(concurrency_limit_data_files); - let mut channel_for_error = tx.clone(); - - spawn(async move { - let result = tasks - .map(|task| Ok((task, file_io.clone(), tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_data_files, - |(file_scan_task, file_io, tx)| async move { - match file_scan_task { - Ok(task) => { - let file_path = task.data_file_path.to_string(); - - spawn(async move { - Self::process_file_scan_task( - task, - batch_size, - file_io, - tx, - row_group_filtering_enabled, - row_selection_enabled, - ) - .await - }) - .await - .map_err(|e| e.with_context("file_path", file_path)) - } - Err(err) => Err(err), - } - }, - ) - .await; + let stream = tasks + .map_ok(move |task| { + let file_io = file_io.clone(); - if let Err(error) = result { - let _ = channel_for_error.send(Err(error)).await; - } - }); + Self::process_file_scan_task( + task, + batch_size, + file_io, + row_group_filtering_enabled, + row_selection_enabled, + ) + }) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err) + }) + .try_buffer_unordered(concurrency_limit_data_files) Review Comment: If we want to switch to push-based execution, maybe we should get rid of `concurrency_limit_data_files` and use [available_parallelism](https://github.com/apache/iceberg-rust/blob/c8f5d91239951dee0b37594dfdb6f7f9bc74258c/crates/iceberg/src/utils.rs#L34) instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org