alamb commented on code in PR #21342:
URL: https://github.com/apache/datafusion/pull/21342#discussion_r3059538870
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -511,11 +512,22 @@ impl From<ParquetSource> for Arc<dyn FileSource> {
impl FileSource for ParquetSource {
fn create_file_opener(
+ &self,
+ _object_store: Arc<dyn ObjectStore>,
+ _base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> datafusion_common::Result<Arc<dyn FileOpener>> {
+ datafusion_common::internal_err!(
+ "ParquetSource::create_file_opener called but it supports the
Morsel API"
Review Comment:
added
##########
datafusion/datasource/src/file_stream/mod.rs:
##########
@@ -904,4 +679,334 @@ mod tests {
);
assert!(err.contains("FileStreamBuilder invalid partition index: 1"));
}
+
+ /// Verifies the simplest morsel-driven flow: one planner produces one
+ /// morsel immediately, and that morsel is then scanned to completion.
+ #[tokio::test]
+ async fn morsel_no_io() -> Result<()> {
+ let test = FileStreamMorselTest::new().with_file(
+ MockPlanner::builder("file1.parquet")
+ .return_morsel(MorselId(10), 42)
+ .return_none()
+ .build(),
+ );
+
+ insta::assert_snapshot!(test.run().await.unwrap(), @r"
+ ----- Output Stream -----
+ Batch: 42
+ Done
+ ----- File Stream Events -----
+ morselize_file: file1.parquet
+ planner_created: file1.parquet
+ planner_called: file1.parquet
+ morsel_produced: file1.parquet, MorselId(10)
+ morsel_stream_started: MorselId(10)
+ morsel_stream_batch_produced: MorselId(10), BatchId(42)
+ morsel_stream_finished: MorselId(10)
+ ");
+
+ Ok(())
+ }
+
+ /// Verifies that a planner can block on one I/O phase and then produce a
+ /// morsel containing two batches.
+ #[tokio::test]
+ async fn morsel_single_io_two_batches() -> Result<()> {
+ let test = FileStreamMorselTest::new().with_file(
+ MockPlanner::builder("file1.parquet")
+ .return_io(IoFutureId(1), PollsToResolve(1))
+ .return_morsel_batches(MorselId(10), vec![42, 43])
+ .return_none()
+ .build(),
+ );
+
+ insta::assert_snapshot!(test.run().await.unwrap(), @r"
+ ----- Output Stream -----
+ Batch: 42
+ Batch: 43
+ Done
+ ----- File Stream Events -----
+ morselize_file: file1.parquet
+ planner_created: file1.parquet
+ planner_called: file1.parquet
+ io_future_created: file1.parquet, IoFutureId(1)
+ io_future_polled: file1.parquet, IoFutureId(1)
+ io_future_polled: file1.parquet, IoFutureId(1)
+ io_future_resolved: file1.parquet, IoFutureId(1)
+ planner_called: file1.parquet
+ morsel_produced: file1.parquet, MorselId(10)
+ morsel_stream_started: MorselId(10)
+ morsel_stream_batch_produced: MorselId(10), BatchId(42)
+ morsel_stream_batch_produced: MorselId(10), BatchId(43)
+ morsel_stream_finished: MorselId(10)
+ ");
+
+ Ok(())
+ }
+
+ /// Verifies that a planner can traverse two sequential I/O phases before
+ /// producing one batch (similar to Parquet which does this0.
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]