adriangb commented on code in PR #21342:
URL: https://github.com/apache/datafusion/pull/21342#discussion_r3035638266
##########
datafusion/datasource/src/file_stream/mod.rs:
##########
@@ -904,4 +679,334 @@ mod tests {
);
assert!(err.contains("FileStreamBuilder invalid partition index: 1"));
}
+
+ /// Verifies the simplest morsel-driven flow: one planner produces one
+ /// morsel immediately, and that morsel is then scanned to completion.
+ #[tokio::test]
+ async fn morsel_no_io() -> Result<()> {
+ let test = FileStreamMorselTest::new().with_file(
+ MockPlanner::builder("file1.parquet")
+ .return_morsel(MorselId(10), 42)
+ .return_none()
+ .build(),
+ );
+
+ insta::assert_snapshot!(test.run().await.unwrap(), @r"
+ ----- Output Stream -----
+ Batch: 42
+ Done
+ ----- File Stream Events -----
+ morselize_file: file1.parquet
+ planner_created: file1.parquet
+ planner_called: file1.parquet
+ morsel_produced: file1.parquet, MorselId(10)
+ morsel_stream_started: MorselId(10)
+ morsel_stream_batch_produced: MorselId(10), BatchId(42)
+ morsel_stream_finished: MorselId(10)
+ ");
+
+ Ok(())
+ }
+
+ /// Verifies that a planner can block on one I/O phase and then produce a
+ /// morsel containing two batches.
+ #[tokio::test]
+ async fn morsel_single_io_two_batches() -> Result<()> {
+ let test = FileStreamMorselTest::new().with_file(
+ MockPlanner::builder("file1.parquet")
+ .return_io(IoFutureId(1), PollsToResolve(1))
+ .return_morsel_batches(MorselId(10), vec![42, 43])
+ .return_none()
+ .build(),
+ );
+
+ insta::assert_snapshot!(test.run().await.unwrap(), @r"
+ ----- Output Stream -----
+ Batch: 42
+ Batch: 43
+ Done
+ ----- File Stream Events -----
+ morselize_file: file1.parquet
+ planner_created: file1.parquet
+ planner_called: file1.parquet
+ io_future_created: file1.parquet, IoFutureId(1)
+ io_future_polled: file1.parquet, IoFutureId(1)
+ io_future_polled: file1.parquet, IoFutureId(1)
+ io_future_resolved: file1.parquet, IoFutureId(1)
+ planner_called: file1.parquet
+ morsel_produced: file1.parquet, MorselId(10)
+ morsel_stream_started: MorselId(10)
+ morsel_stream_batch_produced: MorselId(10), BatchId(42)
+ morsel_stream_batch_produced: MorselId(10), BatchId(43)
+ morsel_stream_finished: MorselId(10)
+ ");
+
+ Ok(())
+ }
+
+ /// Verifies that a planner can traverse two sequential I/O phases before
+ /// producing one batch (similar to Parquet which does this0.
Review Comment:
```suggestion
/// producing one batch (similar to Parquet which does this.
```
##########
datafusion/datasource/src/file_stream/mod.rs:
##########
@@ -904,4 +679,334 @@ mod tests {
);
assert!(err.contains("FileStreamBuilder invalid partition index: 1"));
}
+
+ /// Verifies the simplest morsel-driven flow: one planner produces one
+ /// morsel immediately, and that morsel is then scanned to completion.
+ #[tokio::test]
+ async fn morsel_no_io() -> Result<()> {
+ let test = FileStreamMorselTest::new().with_file(
+ MockPlanner::builder("file1.parquet")
+ .return_morsel(MorselId(10), 42)
+ .return_none()
+ .build(),
+ );
+
+ insta::assert_snapshot!(test.run().await.unwrap(), @r"
+ ----- Output Stream -----
+ Batch: 42
+ Done
+ ----- File Stream Events -----
+ morselize_file: file1.parquet
+ planner_created: file1.parquet
+ planner_called: file1.parquet
+ morsel_produced: file1.parquet, MorselId(10)
+ morsel_stream_started: MorselId(10)
+ morsel_stream_batch_produced: MorselId(10), BatchId(42)
+ morsel_stream_finished: MorselId(10)
+ ");
+
+ Ok(())
+ }
+
+ /// Verifies that a planner can block on one I/O phase and then produce a
+ /// morsel containing two batches.
+ #[tokio::test]
+ async fn morsel_single_io_two_batches() -> Result<()> {
+ let test = FileStreamMorselTest::new().with_file(
+ MockPlanner::builder("file1.parquet")
+ .return_io(IoFutureId(1), PollsToResolve(1))
+ .return_morsel_batches(MorselId(10), vec![42, 43])
+ .return_none()
+ .build(),
+ );
+
+ insta::assert_snapshot!(test.run().await.unwrap(), @r"
+ ----- Output Stream -----
+ Batch: 42
+ Batch: 43
+ Done
+ ----- File Stream Events -----
+ morselize_file: file1.parquet
+ planner_created: file1.parquet
+ planner_called: file1.parquet
+ io_future_created: file1.parquet, IoFutureId(1)
+ io_future_polled: file1.parquet, IoFutureId(1)
+ io_future_polled: file1.parquet, IoFutureId(1)
+ io_future_resolved: file1.parquet, IoFutureId(1)
+ planner_called: file1.parquet
+ morsel_produced: file1.parquet, MorselId(10)
+ morsel_stream_started: MorselId(10)
+ morsel_stream_batch_produced: MorselId(10), BatchId(42)
+ morsel_stream_batch_produced: MorselId(10), BatchId(43)
+ morsel_stream_finished: MorselId(10)
+ ");
+
+ Ok(())
+ }
+
+ /// Verifies that a planner can traverse two sequential I/O phases before
+ /// producing one batch (similar to Parquet which does this0.
+ #[tokio::test]
+ async fn morsel_two_ios_one_batch() -> Result<()> {
Review Comment:
If I understand correctly the point here is that an implementation may need
to do several rounds of IO before it can start producing morsels (in the case
of Parquet fetching the footer, then possibly needing to fetch more footer data
if the default fetch was not enough). I assume it's also able to do multiple
CPU work iterations? IIRC for Parquet it can look something like `fetch footer
(IO) -> deocode footer (CPU) -> fetch page index (IO) -> decode page index
(CPU)`?
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll};
+
+use crate::PartitionedFile;
+use crate::morsel::{Morsel, MorselPlanner, Morselizer};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_plan::metrics::ScopedTimerGuard;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{FutureExt as _, StreamExt as _};
+
+use super::{FileStreamMetrics, OnError};
+
+/// Planner-owned asynchronous I/O discovered while planning a file.
+///
+/// Once `io_future` completes, `planner` becomes CPU-ready again and can be
+/// pushed back onto the scan queue for further planning.
+struct PendingOpen {
+ /// The planner to resume after the I/O completes.
+ planner: Box<dyn MorselPlanner>,
+ /// The outstanding I/O future for `planner`.
+ io_future: BoxFuture<'static, Result<()>>,
+}
+
+/// All mutable state for the active `FileStreamState::Scan` lifecycle.
+///
+/// This groups together ready planners, ready morsels, the active reader,
+/// pending planner I/O, the remaining files and limit, and the metrics
+/// associated with processing that work.
+pub(super) struct ScanState {
Review Comment:
I think some more diagrams in the docstring of the struct and/or fields
could help. I'm trying to wrap my head around how the IO queue and such work.
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll};
+
+use crate::PartitionedFile;
+use crate::morsel::{Morsel, MorselPlanner, Morselizer};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_plan::metrics::ScopedTimerGuard;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{FutureExt as _, StreamExt as _};
+
+use super::{FileStreamMetrics, OnError};
+
+/// Planner-owned asynchronous I/O discovered while planning a file.
+///
+/// Once `io_future` completes, `planner` becomes CPU-ready again and can be
+/// pushed back onto the scan queue for further planning.
+struct PendingOpen {
+ /// The planner to resume after the I/O completes.
+ planner: Box<dyn MorselPlanner>,
+ /// The outstanding I/O future for `planner`.
+ io_future: BoxFuture<'static, Result<()>>,
+}
+
+/// All mutable state for the active `FileStreamState::Scan` lifecycle.
+///
+/// This groups together ready planners, ready morsels, the active reader,
+/// pending planner I/O, the remaining files and limit, and the metrics
+/// associated with processing that work.
+pub(super) struct ScanState {
+ /// Files that still need to be planned.
+ file_iter: VecDeque<PartitionedFile>,
+ /// Remaining record limit, if any.
+ remain: Option<usize>,
+ /// The file-format-specific morselizer used to plan files.
+ morselizer: Box<dyn Morselizer>,
+ /// Describes the behavior if opening or scanning a file fails.
+ on_error: OnError,
+ /// CPU-ready planners for the current file.
+ ready_planners: VecDeque<Box<dyn MorselPlanner>>,
+ /// Ready morsels for the current file.
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
+ /// The active reader, if any.
+ reader: Option<BoxStream<'static, Result<RecordBatch>>>,
+ /// Planner-owned asynchronous I/O that must complete before more planner
+ /// CPU work can be scheduled.
+ pending_open: Option<PendingOpen>,
+ /// Metrics for the active scan queues.
+ metrics: FileStreamMetrics,
+}
+
+impl ScanState {
+ pub(super) fn new(
+ file_iter: impl Into<VecDeque<PartitionedFile>>,
+ remain: Option<usize>,
+ morselizer: Box<dyn Morselizer>,
+ on_error: OnError,
+ metrics: FileStreamMetrics,
+ ) -> Self {
+ let file_iter = file_iter.into();
+ Self {
+ file_iter,
+ remain,
+ morselizer,
+ on_error,
+ ready_planners: Default::default(),
+ ready_morsels: Default::default(),
+ reader: None,
+ pending_open: None,
+ metrics,
+ }
+ }
+
+ /// Updates how scan errors are handled while the stream is still active.
+ pub(super) fn set_on_error(&mut self, on_error: OnError) {
+ self.on_error = on_error;
+ }
+
+ /// Drives one iteration of the active scan state, consuming ready morsels,
+ /// planners, pending planner I/O, or unopened files from `self`.
+ ///
+ /// The return [`ScanAndReturn`] tells `poll_inner` how to update the
+ /// outer `FileStreamState`.
+ pub(super) fn poll_scan(&mut self, cx: &mut Context<'_>) -> ScanAndReturn {
+ let _processing_timer: ScopedTimerGuard<'_> =
+ self.metrics.time_processing.timer();
+ if let Some(pending_open) = self.pending_open.take() {
+ let PendingOpen {
+ planner,
+ mut io_future,
+ } = pending_open;
+ match io_future.poll_unpin(cx) {
+ Poll::Pending => {
+ self.pending_open = Some(PendingOpen { planner, io_future
});
+ return ScanAndReturn::Return(Poll::Pending);
+ }
Review Comment:
I'd find some comments here helpful, e.g:
```suggestion
// We polled the IO future but it didn't complete
// Back to the same state and wait until the next round
of polling
self.pending_open = Some(PendingOpen { planner,
io_future });
return ScanAndReturn::Return(Poll::Pending);
```
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll};
+
+use crate::PartitionedFile;
+use crate::morsel::{Morsel, MorselPlanner, Morselizer};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_plan::metrics::ScopedTimerGuard;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{FutureExt as _, StreamExt as _};
+
+use super::{FileStreamMetrics, OnError};
+
+/// Planner-owned asynchronous I/O discovered while planning a file.
+///
+/// Once `io_future` completes, `planner` becomes CPU-ready again and can be
+/// pushed back onto the scan queue for further planning.
+struct PendingOpen {
+ /// The planner to resume after the I/O completes.
+ planner: Box<dyn MorselPlanner>,
+ /// The outstanding I/O future for `planner`.
+ io_future: BoxFuture<'static, Result<()>>,
+}
+
+/// All mutable state for the active `FileStreamState::Scan` lifecycle.
+///
+/// This groups together ready planners, ready morsels, the active reader,
+/// pending planner I/O, the remaining files and limit, and the metrics
+/// associated with processing that work.
+pub(super) struct ScanState {
+ /// Files that still need to be planned.
+ file_iter: VecDeque<PartitionedFile>,
+ /// Remaining record limit, if any.
+ remain: Option<usize>,
+ /// The file-format-specific morselizer used to plan files.
+ morselizer: Box<dyn Morselizer>,
+ /// Describes the behavior if opening or scanning a file fails.
+ on_error: OnError,
+ /// CPU-ready planners for the current file.
+ ready_planners: VecDeque<Box<dyn MorselPlanner>>,
+ /// Ready morsels for the current file.
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
+ /// The active reader, if any.
+ reader: Option<BoxStream<'static, Result<RecordBatch>>>,
+ /// Planner-owned asynchronous I/O that must complete before more planner
+ /// CPU work can be scheduled.
+ pending_open: Option<PendingOpen>,
+ /// Metrics for the active scan queues.
+ metrics: FileStreamMetrics,
+}
+
+impl ScanState {
+ pub(super) fn new(
+ file_iter: impl Into<VecDeque<PartitionedFile>>,
+ remain: Option<usize>,
+ morselizer: Box<dyn Morselizer>,
+ on_error: OnError,
+ metrics: FileStreamMetrics,
+ ) -> Self {
+ let file_iter = file_iter.into();
+ Self {
+ file_iter,
+ remain,
+ morselizer,
+ on_error,
+ ready_planners: Default::default(),
+ ready_morsels: Default::default(),
+ reader: None,
+ pending_open: None,
+ metrics,
+ }
+ }
+
+ /// Updates how scan errors are handled while the stream is still active.
+ pub(super) fn set_on_error(&mut self, on_error: OnError) {
+ self.on_error = on_error;
+ }
+
+ /// Drives one iteration of the active scan state, consuming ready morsels,
+ /// planners, pending planner I/O, or unopened files from `self`.
+ ///
+ /// The return [`ScanAndReturn`] tells `poll_inner` how to update the
+ /// outer `FileStreamState`.
+ pub(super) fn poll_scan(&mut self, cx: &mut Context<'_>) -> ScanAndReturn {
+ let _processing_timer: ScopedTimerGuard<'_> =
+ self.metrics.time_processing.timer();
+ if let Some(pending_open) = self.pending_open.take() {
+ let PendingOpen {
+ planner,
+ mut io_future,
+ } = pending_open;
+ match io_future.poll_unpin(cx) {
+ Poll::Pending => {
+ self.pending_open = Some(PendingOpen { planner, io_future
});
+ return ScanAndReturn::Return(Poll::Pending);
+ }
+ Poll::Ready(Ok(())) => {
+ self.ready_planners.push_back(planner);
+ return ScanAndReturn::Continue;
+ }
+ Poll::Ready(Err(err)) => {
+ self.metrics.file_open_errors.add(1);
+ self.metrics.time_opening.stop();
+ return match self.on_error {
+ OnError::Skip => {
+ self.metrics.files_processed.add(1);
+ ScanAndReturn::Continue
+ }
+ OnError::Fail => ScanAndReturn::Error(err),
+ };
+ }
+ }
+ }
+
+ if let Some(reader) = self.reader.as_mut() {
+ match reader.poll_next_unpin(cx) {
+ Poll::Pending => return ScanAndReturn::Return(Poll::Pending),
+ Poll::Ready(Some(Ok(batch))) => {
+ self.metrics.time_scanning_until_data.stop();
+ self.metrics.time_scanning_total.stop();
+ let (batch, finished) = match &mut self.remain {
+ Some(remain) => {
+ if *remain > batch.num_rows() {
+ *remain -= batch.num_rows();
+ self.metrics.time_scanning_total.start();
+ (batch, false)
+ } else {
+ let batch = batch.slice(0, *remain);
+ let done = 1 + self.file_iter.len();
+ self.metrics.files_processed.add(done);
+ *remain = 0;
+ (batch, true)
+ }
+ }
+ None => {
+ self.metrics.time_scanning_total.start();
+ (batch, false)
+ }
+ };
+ return if finished {
+ ScanAndReturn::Done(Some(Ok(batch)))
+ } else {
+ ScanAndReturn::Return(Poll::Ready(Some(Ok(batch))))
+ };
+ }
+ Poll::Ready(Some(Err(err))) => {
+ self.reader = None;
+ self.metrics.file_scan_errors.add(1);
+ self.metrics.time_scanning_until_data.stop();
+ self.metrics.time_scanning_total.stop();
+ return match self.on_error {
+ OnError::Skip => {
+ self.metrics.files_processed.add(1);
+ ScanAndReturn::Continue
+ }
+ OnError::Fail => ScanAndReturn::Error(err),
+ };
+ }
+ Poll::Ready(None) => {
+ self.reader = None;
+ self.metrics.files_processed.add(1);
+ self.metrics.time_scanning_until_data.stop();
+ self.metrics.time_scanning_total.stop();
+ return ScanAndReturn::Continue;
+ }
+ }
+ }
+
+ if let Some(morsel) = self.ready_morsels.pop_front() {
+ self.metrics.files_opened.add(1);
Review Comment:
Does a morsel map to a file opened? I thought opening a file produces the
morsels (i.e. this metric should be incremented elsewhere).
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll};
+
+use crate::PartitionedFile;
+use crate::morsel::{Morsel, MorselPlanner, Morselizer};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_plan::metrics::ScopedTimerGuard;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{FutureExt as _, StreamExt as _};
+
+use super::{FileStreamMetrics, OnError};
+
+/// Planner-owned asynchronous I/O discovered while planning a file.
+///
+/// Once `io_future` completes, `planner` becomes CPU-ready again and can be
+/// pushed back onto the scan queue for further planning.
+struct PendingOpen {
+ /// The planner to resume after the I/O completes.
+ planner: Box<dyn MorselPlanner>,
+ /// The outstanding I/O future for `planner`.
+ io_future: BoxFuture<'static, Result<()>>,
+}
+
+/// All mutable state for the active `FileStreamState::Scan` lifecycle.
+///
+/// This groups together ready planners, ready morsels, the active reader,
+/// pending planner I/O, the remaining files and limit, and the metrics
+/// associated with processing that work.
+pub(super) struct ScanState {
+ /// Files that still need to be planned.
+ file_iter: VecDeque<PartitionedFile>,
+ /// Remaining record limit, if any.
+ remain: Option<usize>,
+ /// The file-format-specific morselizer used to plan files.
+ morselizer: Box<dyn Morselizer>,
+ /// Describes the behavior if opening or scanning a file fails.
+ on_error: OnError,
+ /// CPU-ready planners for the current file.
+ ready_planners: VecDeque<Box<dyn MorselPlanner>>,
+ /// Ready morsels for the current file.
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
+ /// The active reader, if any.
+ reader: Option<BoxStream<'static, Result<RecordBatch>>>,
+ /// Planner-owned asynchronous I/O that must complete before more planner
+ /// CPU work can be scheduled.
+ pending_open: Option<PendingOpen>,
+ /// Metrics for the active scan queues.
+ metrics: FileStreamMetrics,
+}
+
+impl ScanState {
+ pub(super) fn new(
+ file_iter: impl Into<VecDeque<PartitionedFile>>,
+ remain: Option<usize>,
+ morselizer: Box<dyn Morselizer>,
+ on_error: OnError,
+ metrics: FileStreamMetrics,
+ ) -> Self {
+ let file_iter = file_iter.into();
+ Self {
+ file_iter,
+ remain,
+ morselizer,
+ on_error,
+ ready_planners: Default::default(),
+ ready_morsels: Default::default(),
+ reader: None,
+ pending_open: None,
+ metrics,
+ }
+ }
+
+ /// Updates how scan errors are handled while the stream is still active.
+ pub(super) fn set_on_error(&mut self, on_error: OnError) {
+ self.on_error = on_error;
+ }
+
+ /// Drives one iteration of the active scan state, consuming ready morsels,
+ /// planners, pending planner I/O, or unopened files from `self`.
+ ///
+ /// The return [`ScanAndReturn`] tells `poll_inner` how to update the
+ /// outer `FileStreamState`.
+ pub(super) fn poll_scan(&mut self, cx: &mut Context<'_>) -> ScanAndReturn {
+ let _processing_timer: ScopedTimerGuard<'_> =
+ self.metrics.time_processing.timer();
+ if let Some(pending_open) = self.pending_open.take() {
+ let PendingOpen {
+ planner,
+ mut io_future,
+ } = pending_open;
+ match io_future.poll_unpin(cx) {
+ Poll::Pending => {
+ self.pending_open = Some(PendingOpen { planner, io_future
});
+ return ScanAndReturn::Return(Poll::Pending);
+ }
+ Poll::Ready(Ok(())) => {
+ self.ready_planners.push_back(planner);
+ return ScanAndReturn::Continue;
+ }
+ Poll::Ready(Err(err)) => {
+ self.metrics.file_open_errors.add(1);
+ self.metrics.time_opening.stop();
+ return match self.on_error {
+ OnError::Skip => {
+ self.metrics.files_processed.add(1);
+ ScanAndReturn::Continue
+ }
+ OnError::Fail => ScanAndReturn::Error(err),
+ };
+ }
+ }
+ }
+
+ if let Some(reader) = self.reader.as_mut() {
+ match reader.poll_next_unpin(cx) {
+ Poll::Pending => return ScanAndReturn::Return(Poll::Pending),
+ Poll::Ready(Some(Ok(batch))) => {
+ self.metrics.time_scanning_until_data.stop();
+ self.metrics.time_scanning_total.stop();
+ let (batch, finished) = match &mut self.remain {
+ Some(remain) => {
+ if *remain > batch.num_rows() {
+ *remain -= batch.num_rows();
+ self.metrics.time_scanning_total.start();
+ (batch, false)
+ } else {
+ let batch = batch.slice(0, *remain);
+ let done = 1 + self.file_iter.len();
+ self.metrics.files_processed.add(done);
+ *remain = 0;
+ (batch, true)
+ }
+ }
+ None => {
+ self.metrics.time_scanning_total.start();
+ (batch, false)
+ }
+ };
+ return if finished {
+ ScanAndReturn::Done(Some(Ok(batch)))
+ } else {
+ ScanAndReturn::Return(Poll::Ready(Some(Ok(batch))))
+ };
+ }
+ Poll::Ready(Some(Err(err))) => {
+ self.reader = None;
+ self.metrics.file_scan_errors.add(1);
+ self.metrics.time_scanning_until_data.stop();
+ self.metrics.time_scanning_total.stop();
+ return match self.on_error {
+ OnError::Skip => {
+ self.metrics.files_processed.add(1);
+ ScanAndReturn::Continue
+ }
+ OnError::Fail => ScanAndReturn::Error(err),
+ };
+ }
+ Poll::Ready(None) => {
+ self.reader = None;
+ self.metrics.files_processed.add(1);
+ self.metrics.time_scanning_until_data.stop();
+ self.metrics.time_scanning_total.stop();
+ return ScanAndReturn::Continue;
+ }
+ }
+ }
+
+ if let Some(morsel) = self.ready_morsels.pop_front() {
+ self.metrics.files_opened.add(1);
+ self.metrics.time_opening.stop();
+ self.metrics.time_scanning_until_data.start();
+ self.metrics.time_scanning_total.start();
+ self.reader = Some(morsel.into_stream());
+ return ScanAndReturn::Continue;
+ }
+
+ if let Some(mut planner) = self.ready_planners.pop_front() {
+ return match planner.plan() {
+ Ok(Some(mut plan)) => {
+ self.ready_morsels.extend(plan.take_morsels());
+ self.ready_planners.extend(plan.take_planners());
Review Comment:
I see now, a planner can produce more planners (this is how it cycles
through IO and CPU)
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll};
+
+use crate::PartitionedFile;
+use crate::morsel::{Morsel, MorselPlanner, Morselizer};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_plan::metrics::ScopedTimerGuard;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{FutureExt as _, StreamExt as _};
+
+use super::{FileStreamMetrics, OnError};
+
+/// Planner-owned asynchronous I/O discovered while planning a file.
+///
+/// Once `io_future` completes, `planner` becomes CPU-ready again and can be
+/// pushed back onto the scan queue for further planning.
+struct PendingOpen {
+ /// The planner to resume after the I/O completes.
+ planner: Box<dyn MorselPlanner>,
+ /// The outstanding I/O future for `planner`.
+ io_future: BoxFuture<'static, Result<()>>,
+}
+
+/// All mutable state for the active `FileStreamState::Scan` lifecycle.
+///
+/// This groups together ready planners, ready morsels, the active reader,
+/// pending planner I/O, the remaining files and limit, and the metrics
+/// associated with processing that work.
+pub(super) struct ScanState {
+ /// Files that still need to be planned.
+ file_iter: VecDeque<PartitionedFile>,
+ /// Remaining record limit, if any.
Review Comment:
record = row in this context?
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll};
+
+use crate::PartitionedFile;
+use crate::morsel::{Morsel, MorselPlanner, Morselizer};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_plan::metrics::ScopedTimerGuard;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{FutureExt as _, StreamExt as _};
+
+use super::{FileStreamMetrics, OnError};
+
+/// Planner-owned asynchronous I/O discovered while planning a file.
+///
+/// Once `io_future` completes, `planner` becomes CPU-ready again and can be
+/// pushed back onto the scan queue for further planning.
+struct PendingOpen {
+ /// The planner to resume after the I/O completes.
+ planner: Box<dyn MorselPlanner>,
+ /// The outstanding I/O future for `planner`.
+ io_future: BoxFuture<'static, Result<()>>,
+}
+
+/// All mutable state for the active `FileStreamState::Scan` lifecycle.
+///
+/// This groups together ready planners, ready morsels, the active reader,
+/// pending planner I/O, the remaining files and limit, and the metrics
+/// associated with processing that work.
+pub(super) struct ScanState {
+ /// Files that still need to be planned.
+ file_iter: VecDeque<PartitionedFile>,
+ /// Remaining record limit, if any.
+ remain: Option<usize>,
+ /// The file-format-specific morselizer used to plan files.
+ morselizer: Box<dyn Morselizer>,
+ /// Describes the behavior if opening or scanning a file fails.
+ on_error: OnError,
+ /// CPU-ready planners for the current file.
+ ready_planners: VecDeque<Box<dyn MorselPlanner>>,
+ /// Ready morsels for the current file.
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
+ /// The active reader, if any.
+ reader: Option<BoxStream<'static, Result<RecordBatch>>>,
+ /// Planner-owned asynchronous I/O that must complete before more planner
+ /// CPU work can be scheduled.
+ pending_open: Option<PendingOpen>,
+ /// Metrics for the active scan queues.
+ metrics: FileStreamMetrics,
+}
+
+impl ScanState {
+ pub(super) fn new(
+ file_iter: impl Into<VecDeque<PartitionedFile>>,
+ remain: Option<usize>,
+ morselizer: Box<dyn Morselizer>,
+ on_error: OnError,
+ metrics: FileStreamMetrics,
+ ) -> Self {
+ let file_iter = file_iter.into();
+ Self {
+ file_iter,
+ remain,
+ morselizer,
+ on_error,
+ ready_planners: Default::default(),
+ ready_morsels: Default::default(),
+ reader: None,
+ pending_open: None,
+ metrics,
+ }
+ }
+
+ /// Updates how scan errors are handled while the stream is still active.
+ pub(super) fn set_on_error(&mut self, on_error: OnError) {
+ self.on_error = on_error;
+ }
+
+ /// Drives one iteration of the active scan state, consuming ready morsels,
+ /// planners, pending planner I/O, or unopened files from `self`.
+ ///
+ /// The return [`ScanAndReturn`] tells `poll_inner` how to update the
+ /// outer `FileStreamState`.
+ pub(super) fn poll_scan(&mut self, cx: &mut Context<'_>) -> ScanAndReturn {
+ let _processing_timer: ScopedTimerGuard<'_> =
+ self.metrics.time_processing.timer();
+ if let Some(pending_open) = self.pending_open.take() {
+ let PendingOpen {
+ planner,
+ mut io_future,
+ } = pending_open;
+ match io_future.poll_unpin(cx) {
+ Poll::Pending => {
+ self.pending_open = Some(PendingOpen { planner, io_future
});
+ return ScanAndReturn::Return(Poll::Pending);
+ }
+ Poll::Ready(Ok(())) => {
+ self.ready_planners.push_back(planner);
+ return ScanAndReturn::Continue;
Review Comment:
Similar to above. Although the abstractions help encapsulate really I can't
tell what is supposed to happen just because `ScanAndReturn::Continue` is
returned from here. There is a complex chain of `FileStream::poll` ->
`FileStream::poll_inner` -> `ScanState::poll_scan` that is hard to track. I
think LLMs will have no problem tracking through it but us mere humans could be
helped by some summary comments on each branch.
```suggestion
// We polled the file open IO future and it completed.
// It yielded us a `MorselPlanner` which we store.
// Now we can move onto polling the next file open.
self.ready_planners.push_back(planner);
return ScanAndReturn::Continue;
```
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -511,11 +512,22 @@ impl From<ParquetSource> for Arc<dyn FileSource> {
impl FileSource for ParquetSource {
fn create_file_opener(
+ &self,
+ _object_store: Arc<dyn ObjectStore>,
+ _base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> datafusion_common::Result<Arc<dyn FileOpener>> {
+ datafusion_common::internal_err!(
+ "ParquetSource::create_file_opener called but it supports the
Morsel API"
Review Comment:
```suggestion
"ParquetSource::create_file_opener called but it supports the
Morsel API, please use that instead"
```
Note that this will be a breaking change for folks using `ParquetSource`
directly (which I believe @xudong963 / @zhuqi-lucas are based on
https://github.com/apache/datafusion/issues/21290).
##########
datafusion/datasource/src/file.rs:
##########
@@ -63,13 +64,33 @@ pub fn as_file_source<T: FileSource + 'static>(source: T)
-> Arc<dyn FileSource>
///
/// [`DataSource`]: crate::source::DataSource
pub trait FileSource: Send + Sync {
- /// Creates a `dyn FileOpener` based on given parameters
+ /// Creates a `dyn FileOpener` based on given parameters.
+ ///
+ /// Note: File sources with a native morsel implementation should return an
+ /// error from this method and implementing [`Self::create_morselizer`]
instead.
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Arc<dyn FileOpener>>;
+
+ /// Creates a `dyn Morselizer` based on given parameters.
+ ///
+ /// The default implementation preserves existing behavior by adapting the
+ /// legacy [`FileOpener`] API into a [`Morselizer`].
+ ///
+ /// It is preferred to implement the [`Morselizer`] API directly by
+ /// implementing this method.
+ fn create_morselizer(
+ &self,
+ object_store: Arc<dyn ObjectStore>,
+ base_config: &FileScanConfig,
+ partition: usize,
+ ) -> Result<Box<dyn Morselizer>> {
+ let opener = self.create_file_opener(object_store, base_config,
partition)?;
+ Ok(Box::new(FileOpenerMorselizer::new(opener)))
+ }
Review Comment:
🚀
##########
datafusion/datasource/src/file_stream/builder.rs:
##########
@@ -51,8 +54,18 @@ impl<'a> FileStreamBuilder<'a> {
}
/// Configure the [`FileOpener`] used to open files.
+ ///
+ /// This will overwrite any setting from [`Self::with_morselizer`]
pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) ->
Self {
Review Comment:
While I think it could make sense to keep `FileOpener` as a public API for
building data sources (if we consider it simpler, for folks who don't care
about perf), this method in particular seems like a mostly internal method
(even if it is pub) on we might as well deprecate / remove.
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll};
+
+use crate::PartitionedFile;
+use crate::morsel::{Morsel, MorselPlanner, Morselizer};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_plan::metrics::ScopedTimerGuard;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{FutureExt as _, StreamExt as _};
+
+use super::{FileStreamMetrics, OnError};
+
+/// Planner-owned asynchronous I/O discovered while planning a file.
+///
+/// Once `io_future` completes, `planner` becomes CPU-ready again and can be
+/// pushed back onto the scan queue for further planning.
+struct PendingOpen {
+ /// The planner to resume after the I/O completes.
+ planner: Box<dyn MorselPlanner>,
+ /// The outstanding I/O future for `planner`.
+ io_future: BoxFuture<'static, Result<()>>,
+}
+
+/// All mutable state for the active `FileStreamState::Scan` lifecycle.
+///
+/// This groups together ready planners, ready morsels, the active reader,
+/// pending planner I/O, the remaining files and limit, and the metrics
+/// associated with processing that work.
+pub(super) struct ScanState {
+ /// Files that still need to be planned.
+ file_iter: VecDeque<PartitionedFile>,
+ /// Remaining record limit, if any.
+ remain: Option<usize>,
+ /// The file-format-specific morselizer used to plan files.
+ morselizer: Box<dyn Morselizer>,
+ /// Describes the behavior if opening or scanning a file fails.
+ on_error: OnError,
+ /// CPU-ready planners for the current file.
+ ready_planners: VecDeque<Box<dyn MorselPlanner>>,
+ /// Ready morsels for the current file.
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
Review Comment:
My understanding of the state machine is `File -> MorselPlanner` (via
`Morselizer`, an IO operation) and then `MorselPlanner` -> `Morsel` (a CPU
operation) and finally `Morsel -> RecordBatch(es)` (IO). Is that right?
My fear is that:
1. This adequately describes Parquet but it seems like for arbitrary formats
there may be arbitrary turns of IO, CPU and so this may not map cleanly.
2. Operations like `Morsel` -> `RecordBatch` are still a mix of IO/CPU
(especially with filter pushdown on).
But maybe I'm misunderstanding the flow or intention here.
##########
datafusion/datasource/src/file_stream/mod.rs:
##########
@@ -89,105 +79,31 @@ impl FileStream {
/// If `OnError::Skip` the stream will skip files which encounter an error
and continue
/// If `OnError:Fail` (default) the stream will fail and stop processing
when an error occurs
pub fn with_on_error(mut self, on_error: OnError) -> Self {
- self.on_error = on_error;
+ if let FileStreamState::Scan { scan_state } = &mut self.state {
+ scan_state.set_on_error(on_error);
+ }
self
Review Comment:
Currently this is the only state it makes sense to modify (the others are
terminal states). But I did have to go check the `FileStreamState` enum to
confirm. Might be worth either adding a comment here or just doing a `match`
with `FileStreamState::Error(_) | FileStreamState::Done(_)` and add a comment
on top explaining those are terminal states + to force ourselves to handle new
cases in the future if they were added. It would be an annoying bug to debug,
worth the 1 LOC IMO.
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll};
+
+use crate::PartitionedFile;
+use crate::morsel::{Morsel, MorselPlanner, Morselizer};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_plan::metrics::ScopedTimerGuard;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{FutureExt as _, StreamExt as _};
+
+use super::{FileStreamMetrics, OnError};
+
+/// Planner-owned asynchronous I/O discovered while planning a file.
+///
+/// Once `io_future` completes, `planner` becomes CPU-ready again and can be
+/// pushed back onto the scan queue for further planning.
+struct PendingOpen {
+ /// The planner to resume after the I/O completes.
+ planner: Box<dyn MorselPlanner>,
+ /// The outstanding I/O future for `planner`.
+ io_future: BoxFuture<'static, Result<()>>,
+}
+
+/// All mutable state for the active `FileStreamState::Scan` lifecycle.
+///
+/// This groups together ready planners, ready morsels, the active reader,
+/// pending planner I/O, the remaining files and limit, and the metrics
+/// associated with processing that work.
+pub(super) struct ScanState {
+ /// Files that still need to be planned.
+ file_iter: VecDeque<PartitionedFile>,
+ /// Remaining record limit, if any.
+ remain: Option<usize>,
+ /// The file-format-specific morselizer used to plan files.
+ morselizer: Box<dyn Morselizer>,
+ /// Describes the behavior if opening or scanning a file fails.
+ on_error: OnError,
+ /// CPU-ready planners for the current file.
+ ready_planners: VecDeque<Box<dyn MorselPlanner>>,
+ /// Ready morsels for the current file.
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
+ /// The active reader, if any.
+ reader: Option<BoxStream<'static, Result<RecordBatch>>>,
Review Comment:
Is there one `ScanState` across all partitions or one per partition? I'm
guessing the latter: `file_iter: VecDeque<PartitionedFile>` is the files for
this partition, we pump all of the files into one output stream of
`RecordBatch` (`reader`). But we can have multiple planners / morsels ready and
merge those all into a single stream of `RecordBatch` on the way out.
##########
datafusion/datasource/src/file_stream/metrics.rs:
##########
Review Comment:
Ignoring, done in #21340
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]