alamb commented on code in PR #21351:
URL: https://github.com/apache/datafusion/pull/21351#discussion_r3080654356
##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -580,6 +582,15 @@ impl DataSource for FileScanConfig {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
+ self.open_with_args(OpenArgs::new(partition, context))
+ }
+
+ fn open_with_args(&self, args: OpenArgs) ->
Result<SendableRecordBatchStream> {
Review Comment:
Added an `open_with_args` API to mirror other `with_args` APIs such as
`TableSource::scan_with_args`:
https://github.com/apache/datafusion/blob/bb1c8e658942e8b8c4bd6d7636dd9eb52c395d54/datafusion/catalog/src/table.rs#L209-L221
The new API was required to pass in the shared state (aka to connect sibling
streams so they can share / reorder work)
##########
datafusion/datasource/src/file_stream/mod.rs:
##########
@@ -1001,11 +1010,265 @@ mod tests {
Ok(())
}
- /// Tests how FileStream opens and processes files.
+ /// Return a morsel test with two partitions:
+ /// Partition 0: file1, file2, file3
+ /// Partition 1: file4
+ ///
+ /// Partition 1 has only 1 file but it polled first 4 times
+ fn two_partition_morsel_test() -> FileStreamMorselTest {
+ FileStreamMorselTest::new()
+ // Partition 0 has three files
+ .with_file_in_partition(
+ PartitionId(0),
+ MockPlanner::builder("file1.parquet")
+ .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10),
101))
+ .return_none(),
+ )
+ .with_file_in_partition(
+ PartitionId(0),
+ MockPlanner::builder("file2.parquet")
+ .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11),
102))
+ .return_none(),
+ )
+ .with_file_in_partition(
+ PartitionId(0),
+ MockPlanner::builder("file3.parquet")
+ .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12),
103))
+ .return_none(),
+ )
+ // Partition 1 has only one file, but is polled first
+ .with_file_in_partition(
+ PartitionId(1),
+ MockPlanner::builder("file4.parquet")
+ .add_plan(MockPlanBuilder::new().with_morsel(MorselId(13),
201))
+ .return_none(),
+ )
+ .with_reads(vec![
+ PartitionId(1),
+ PartitionId(1),
+ PartitionId(1),
+ PartitionId(1),
+ PartitionId(1),
+ ])
+ }
+
+ /// Verifies that an idle sibling stream can steal shared files from
+ /// another stream once it exhausts its own local work.
+ #[tokio::test]
+ async fn morsel_shared_files_can_be_stolen() -> Result<()> {
+ let test = two_partition_morsel_test().with_file_stream_events(false);
+
+ // Partition 0 starts with 3 files, but Partition 1 is polled first.
+ // Since Partition is polled first, it will run all the files even
those
+ // that were assigned to Partition 0.
+ insta::assert_snapshot!(test.run().await.unwrap(), @r"
+ ----- Partition 0 -----
+ Done
+ ----- Partition 1 -----
+ Batch: 101
Review Comment:
note that partition 1 has run all the files
##########
datafusion/datasource/src/file_stream/work_source.rs:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+use crate::PartitionedFile;
+use crate::file_groups::FileGroup;
+use crate::file_scan_config::FileScanConfig;
+use parking_lot::Mutex;
+
+/// Source of work for `ScanState`.
+///
+/// Streams that may share work across siblings use [`WorkSource::Shared`],
+/// while streams that can not share work (e.g. because they must preserve file
+/// order) use [`WorkSource::Local`].
+#[derive(Debug, Clone)]
+pub(super) enum WorkSource {
+ /// Files this stream will plan locally without sharing them.
+ Local(VecDeque<PartitionedFile>),
+ /// Files shared with sibling streams.
+ Shared(SharedWorkSource),
+}
+
+impl WorkSource {
+ /// Pop the next file to plan from this work source.
+ pub(super) fn pop_front(&mut self) -> Option<PartitionedFile> {
+ match self {
+ Self::Local(files) => files.pop_front(),
+ Self::Shared(shared) => shared.pop_front(),
+ }
+ }
+
+ /// Return the number of files that are still waiting to be planned.
+ pub(super) fn len(&self) -> usize {
+ match self {
+ Self::Local(files) => files.len(),
+ Self::Shared(shared) => shared.len(),
+ }
+ }
+}
+
+/// Shared source of work for sibling `FileStream`s
Review Comment:
At the moment, the work source only supports entire files, but I can imagine
it getting more sophisticated and supporting morsels and morsel planners too
(to do work stealing, etc)
##########
datafusion/datasource/src/file_stream/mod.rs:
##########
@@ -24,6 +24,7 @@
mod builder;
mod metrics;
mod scan_state;
+pub(crate) mod work_source;
Review Comment:
Note that there are no changes to the FIleStream -- this is only test changes
##########
datafusion/datasource/src/file_stream/scan_state.rs:
##########
@@ -62,8 +62,8 @@ use super::{FileStreamMetrics, OnError};
///
/// [`FileStreamState::Scan`]: super::FileStreamState::Scan
pub(super) struct ScanState {
- /// Files that still need to be planned.
- file_iter: VecDeque<PartitionedFile>,
+ /// Unopened files that still need to be planned for this stream.
+ work_source: WorkSource,
Review Comment:
Here is the key difference -- instead of a local queue there is now a
(potentially) shared work source
##########
datafusion/datasource/src/source.rs:
##########
@@ -246,6 +257,55 @@ pub trait DataSource: Send + Sync + Debug {
) -> Option<Arc<dyn DataSource>> {
None
}
+
+ /// Create per execution state to share across sibling instances of this
Review Comment:
These are they key new APIs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]