alamb commented on code in PR #21342:
URL: https://github.com/apache/datafusion/pull/21342#discussion_r3059786046


##########
datafusion/datasource/src/file_stream/mod.rs:
##########
@@ -904,4 +679,334 @@ mod tests {
         );
         assert!(err.contains("FileStreamBuilder invalid partition index: 1"));
     }
+
+    /// Verifies the simplest morsel-driven flow: one planner produces one
+    /// morsel immediately, and that morsel is then scanned to completion.
+    #[tokio::test]
+    async fn morsel_no_io() -> Result<()> {
+        let test = FileStreamMorselTest::new().with_file(
+            MockPlanner::builder("file1.parquet")
+                .return_morsel(MorselId(10), 42)
+                .return_none()
+                .build(),
+        );
+
+        insta::assert_snapshot!(test.run().await.unwrap(), @r"
+        ----- Output Stream -----
+        Batch: 42
+        Done
+        ----- File Stream Events -----
+        morselize_file: file1.parquet
+        planner_created: file1.parquet
+        planner_called: file1.parquet
+        morsel_produced: file1.parquet, MorselId(10)
+        morsel_stream_started: MorselId(10)
+        morsel_stream_batch_produced: MorselId(10), BatchId(42)
+        morsel_stream_finished: MorselId(10)
+        ");
+
+        Ok(())
+    }
+
+    /// Verifies that a planner can block on one I/O phase and then produce a
+    /// morsel containing two batches.
+    #[tokio::test]
+    async fn morsel_single_io_two_batches() -> Result<()> {
+        let test = FileStreamMorselTest::new().with_file(
+            MockPlanner::builder("file1.parquet")
+                .return_io(IoFutureId(1), PollsToResolve(1))
+                .return_morsel_batches(MorselId(10), vec![42, 43])
+                .return_none()
+                .build(),
+        );
+
+        insta::assert_snapshot!(test.run().await.unwrap(), @r"
+        ----- Output Stream -----
+        Batch: 42
+        Batch: 43
+        Done
+        ----- File Stream Events -----
+        morselize_file: file1.parquet
+        planner_created: file1.parquet
+        planner_called: file1.parquet
+        io_future_created: file1.parquet, IoFutureId(1)
+        io_future_polled: file1.parquet, IoFutureId(1)
+        io_future_polled: file1.parquet, IoFutureId(1)
+        io_future_resolved: file1.parquet, IoFutureId(1)
+        planner_called: file1.parquet
+        morsel_produced: file1.parquet, MorselId(10)
+        morsel_stream_started: MorselId(10)
+        morsel_stream_batch_produced: MorselId(10), BatchId(42)
+        morsel_stream_batch_produced: MorselId(10), BatchId(43)
+        morsel_stream_finished: MorselId(10)
+        ");
+
+        Ok(())
+    }
+
+    /// Verifies that a planner can traverse two sequential I/O phases before
+    /// producing one batch (similar to Parquet which does this0.
+    #[tokio::test]
+    async fn morsel_two_ios_one_batch() -> Result<()> {

Review Comment:
   Also it may not be multiple actual IOs, but rather that the IO future isn't 
ready for a few polls 



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -425,85 +408,6 @@ impl ParquetOpenState {
     }
 }
 
-/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API

Review Comment:
   shim layer removed as the FileStream uses the Morsel API natively now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to