hcrosse commented on code in PR #1537:
URL: 
https://github.com/apache/datafusion-ballista/pull/1537#discussion_r3028594421


##########
ballista/core/src/utils.rs:
##########
@@ -159,42 +159,74 @@ pub fn default_config_producer() -> SessionConfig {
     SessionConfig::new_with_ballista()
 }
 
-/// Stream data to disk in Arrow IPC format
+/// Stream data to disk in Arrow IPC format.
+///
+/// Batches are read from the async stream and forwarded through a bounded
+/// channel to a `spawn_blocking` task that performs all synchronous file I/O,
+/// keeping the tokio worker thread unblocked.
 pub async fn write_stream_to_disk(
     stream: &mut Pin<Box<dyn RecordBatchStream + Send>>,
     path: &str,
     disk_write_metric: &metrics::Time,
 ) -> Result<PartitionStats> {
-    let file = BufWriter::new(File::create(path).map_err(|e| {
-        error!("Failed to create partition file at {path}: {e:?}");
-        BallistaError::IoError(e)
-    })?);
+    let schema = stream.schema();
+    let path_owned = path.to_owned();
+    let write_metric = disk_write_metric.clone();
+
+    let (tx, mut rx) = tokio::sync::mpsc::channel::<RecordBatch>(2);
+
+    let handle = tokio::task::spawn_blocking(move || -> Result<()> {
+        let file = BufWriter::new(File::create(&path_owned).map_err(|e| {
+            error!("Failed to create partition file at {path_owned}: {e:?}");
+            BallistaError::IoError(e)
+        })?);
+
+        let options = IpcWriteOptions::default()
+            .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+
+        let mut writer =
+            StreamWriter::try_new_with_options(file, schema.as_ref(), 
options)?;
+
+        while let Some(batch) = rx.blocking_recv() {
+            let timer = write_metric.timer();
+            writer.write(&batch)?;
+            timer.done();
+        }
+        let timer = write_metric.timer();
+        writer.finish()?;
+        timer.done();
+        Ok(())
+    });
 
     let mut num_rows = 0;
     let mut num_batches = 0;
     let mut num_bytes = 0;
 
-    let options = IpcWriteOptions::default()
-        .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
-
-    let mut writer =
-        StreamWriter::try_new_with_options(file, stream.schema().as_ref(), 
options)?;
-
-    while let Some(result) = stream.next().await {
-        let batch = result?;
+    let stream_err = loop {
+        match stream.next().await {
+            Some(Ok(batch)) => {
+                num_batches += 1;
+                num_rows += batch.num_rows();
+                num_bytes += batch.get_array_memory_size();
+                if tx.send(batch).await.is_err() {
+                    break None;
+                }
+            }
+            Some(Err(e)) => break Some(e),
+            None => break None,
+        }
+    };
+    drop(tx);
 
-        let batch_size_bytes: usize = batch.get_array_memory_size();
-        num_batches += 1;
-        num_rows += batch.num_rows();
-        num_bytes += batch_size_bytes;
+    let write_result = handle
+        .await
+        .map_err(|e| BallistaError::General(format!("Disk writer task failed: 
{e}")))?;
 
-        let timer = disk_write_metric.timer();
-        writer.write(&batch)?;
-        timer.done();
+    if let Some(e) = stream_err {
+        return Err(e.into());

Review Comment:
   Also logging an `error` for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to