alamb commented on code in PR #21882:
URL: https://github.com/apache/datafusion/pull/21882#discussion_r3438657558


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -3098,9 +3098,9 @@ mod test {
         let input_partitions = vec![partition1, partition2];
 
         // Set up context with tight memory limit to force spilling
-        // Sorting needs some non-spillable memory, so 64 bytes should force 
spilling while still allowing the query to complete
+        // Sorting needs some non-spillable memory, so 608 bytes should force 
spilling while still allowing the query to complete

Review Comment:
   Why did this test need to change? It seems to me like we shouldn't have to 
change existing tests for a new pluggable backend -- shouldn't the new code 
only be exercised if you have explicitly added a new spilling backend?



##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -274,19 +258,73 @@ pub fn spill_record_batch_by_size(
     Ok(())
 }
 
-/// Write in Arrow IPC Stream format to a file.
-///
-/// Stream format is used for spill because it supports dictionary 
replacement, and the random
-/// access of IPC File format is not needed (IPC File format doesn't support 
dictionary replacement).
+/// An adapter that implements `std::io::Write` to bridge Arrow's synchronous

Review Comment:
   This looks a lot like a `BufferWrtier` -- and it seems like this require a 
separate copy. 
   
   Wouldn't a better API just be to write directly to the inner spill writer?



##########
datafusion/execution/src/spill_file.rs:
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use datafusion_common::Result;
+use futures::Stream;
+use std::path::Path;
+use std::pin::Pin;
+use std::sync::Arc;
+
+/// Abstraction over a spill file backend.
+/// Implementations handle their own quota enforcement and blocking concerns.
+pub trait SpillFile: Send + Sync {
+    /// Returns the OS path if this is a local file, None otherwise.
+    fn path(&self) -> Option<&Path> {
+        None
+    }
+
+    /// Returns current size in bytes if cheaply available.
+    fn size(&self) -> Option<u64>;
+
+    /// Returns file contents as an async stream of byte chunks.
+    fn read_stream(&self) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes>> + 
Send>>>;
+
+    /// Opens a writer for appending data to this file.
+    fn open_writer(&self) -> Result<Box<dyn SpillWriter>>;
+
+    /// Opens a synchronous reader for this file.
+    /// Used by legacy operators (like SortMergeJoin) that haven't been fully 
migrated to async.
+    ///
+    /// Backends that only support async reads should leave this default 
implementation,
+    /// which will safely return a NotImplemented error if used in synchronous 
contexts.
+    fn open_sync_reader(&self) -> Result<Box<dyn std::io::Read + Send>> {
+        datafusion_common::exec_err!(
+            "Synchronous reads are not supported by this spill backend. \
+            This backend cannot be used with synchronous operators like 
SortMergeJoin \
+            until they are refactored to be fully asynchronous."
+        )
+    }
+}
+
+/// Writer for spill file backends.
+/// Receives zero-copy `Bytes` payloads from the IPCStreamWriter adapter.
+pub trait SpillWriter: Send {
+    fn write(&mut self, data: Bytes) -> Result<()>;

Review Comment:
   This is all true -- however, I think that since the underlying IPC writer 
takes a std::io::Write, forcing all backends to use `Bytes` will likely require 
an extra unecessary copy (see comments below on SpillWriterAdapter) anways.
   
   If you use a std::io::write like interface here, backends that want to queue 
chunks can do so (by copying into Bytes buffers themselves)
   
   Thus what i suggest is:
   1. Change this to look more like std::io::wrote:
   
   ```rust
       fn write(&mut self, data: &[u8]) -> Result<()>;
   ```
   
   Which will allow you to get rid of the write adapter



##########
datafusion/physical-plan/src/spill/mod.rs:
##########


Review Comment:
   Update here is the PR:
   - https://github.com/apache/datafusion/pull/23029



##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -39,212 +39,187 @@ use arrow::array::{
     Array, ArrayRef, BinaryViewArray, BufferSpec, GenericByteViewArray, 
StringViewArray,
     layout, make_array,
 };
+use arrow::buffer::Buffer;
 use arrow::datatypes::DataType;
 use arrow::datatypes::{ByteViewType, Schema, SchemaRef};
 use arrow::ipc::{
     MetadataVersion,
-    reader::StreamReader,
+    reader::StreamDecoder,
     writer::{IpcWriteOptions, StreamWriter},
 };
 use arrow::record_batch::RecordBatch;
 use arrow_data::ArrayDataBuilder;
 use arrow_ipc::CompressionType;
 
 use datafusion_common::config::SpillCompression;
-use datafusion_common::{DataFusionError, Result, exec_datafusion_err, 
exec_err};
-use datafusion_common_runtime::SpawnedTask;
+use datafusion_common::{DataFusionError, Result, exec_datafusion_err};
 use datafusion_execution::RecordBatchStream;
-use datafusion_execution::disk_manager::RefCountedTempFile;
-use futures::{FutureExt as _, Stream};
+use datafusion_execution::spill_file::SpillFile;
+use futures::Stream;
 use log::debug;
 
-/// Stream that reads spill files from disk where each batch is read in a 
spawned blocking task
-/// It will read one batch at a time and will not do any buffering, to buffer 
data use [`crate::common::spawn_buffered`]
-///
-/// A simpler solution would be spawning a long-running blocking task for each
-/// file read (instead of each batch). This approach does not work because when
-/// the number of concurrent reads exceeds the Tokio thread pool limit,
-/// deadlocks can occur and block progress.
+/// Stream that reads spill files from a [`SpillFile`] backend as a stream of 
[`RecordBatch`]es.
+/// Uses [`StreamDecoder`] to decode IPC bytes received from the backend's 
async byte stream.
+/// Backends handle their own threading concerns internally - OS files use
+/// `tokio::fs::File` which performs blocking IO per-syscall without holding a 
thread
+/// for the file's lifetime, avoiding deadlocks when concurrent reads exceed 
thread pool limits.
 struct SpillReaderStream {
     schema: SchemaRef,
-    state: SpillReaderStreamState,
+    decoder: StreamDecoder,
+    byte_stream: Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send>>,

Review Comment:
   this seems ok, though I do worry a little that decoding now requires an 
extra copy (read from File -> Bytes, and then read from Bytes into the internal 
buffers of the IPC decoder)



##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -39,212 +39,187 @@ use arrow::array::{
     Array, ArrayRef, BinaryViewArray, BufferSpec, GenericByteViewArray, 
StringViewArray,
     layout, make_array,
 };
+use arrow::buffer::Buffer;
 use arrow::datatypes::DataType;
 use arrow::datatypes::{ByteViewType, Schema, SchemaRef};
 use arrow::ipc::{
     MetadataVersion,
-    reader::StreamReader,
+    reader::StreamDecoder,
     writer::{IpcWriteOptions, StreamWriter},
 };
 use arrow::record_batch::RecordBatch;
 use arrow_data::ArrayDataBuilder;
 use arrow_ipc::CompressionType;
 
 use datafusion_common::config::SpillCompression;
-use datafusion_common::{DataFusionError, Result, exec_datafusion_err, 
exec_err};
-use datafusion_common_runtime::SpawnedTask;
+use datafusion_common::{DataFusionError, Result, exec_datafusion_err};
 use datafusion_execution::RecordBatchStream;
-use datafusion_execution::disk_manager::RefCountedTempFile;
-use futures::{FutureExt as _, Stream};
+use datafusion_execution::spill_file::SpillFile;
+use futures::Stream;
 use log::debug;
 
-/// Stream that reads spill files from disk where each batch is read in a 
spawned blocking task
-/// It will read one batch at a time and will not do any buffering, to buffer 
data use [`crate::common::spawn_buffered`]
-///
-/// A simpler solution would be spawning a long-running blocking task for each
-/// file read (instead of each batch). This approach does not work because when
-/// the number of concurrent reads exceeds the Tokio thread pool limit,
-/// deadlocks can occur and block progress.
+/// Stream that reads spill files from a [`SpillFile`] backend as a stream of 
[`RecordBatch`]es.
+/// Uses [`StreamDecoder`] to decode IPC bytes received from the backend's 
async byte stream.
+/// Backends handle their own threading concerns internally - OS files use
+/// `tokio::fs::File` which performs blocking IO per-syscall without holding a 
thread
+/// for the file's lifetime, avoiding deadlocks when concurrent reads exceed 
thread pool limits.
 struct SpillReaderStream {
     schema: SchemaRef,
-    state: SpillReaderStreamState,
+    decoder: StreamDecoder,
+    byte_stream: Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send>>,
+    is_done: bool,
+
     /// Maximum memory size observed among spilling sorted record batches.
     /// This is used for validation purposes during reading each RecordBatch 
from spill.
     /// For context on why this value is recorded and validated,
     /// see `physical_plan/sort/multi_level_merge.rs`.
     max_record_batch_memory: Option<usize>,
-}
 
-// Small margin allowed to accommodate slight memory accounting variation
-const SPILL_BATCH_MEMORY_MARGIN: usize = 4096;
+    /// Holds leftover bytes from a chunk when a batch is yielded early
+    current_buffer: Buffer,
 
-/// When we poll for the next batch, we will get back both the batch and the 
reader,
-/// so we can call `next` again.
-type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>, 
Option<RecordBatch>)>;
+    /// Keeps the file alive until the stream is dropped
+    _spill_file: Arc<dyn SpillFile>,
 
-enum SpillReaderStreamState {
-    /// Initial state: the stream was not initialized yet
-    /// and the file was not opened
-    Uninitialized(RefCountedTempFile),
-
-    /// A read is in progress in a spawned blocking task for which we hold the 
handle.
-    ReadInProgress(SpawnedTask<NextRecordBatchResult>),
-
-    /// A read has finished and we wait for being polled again in order to 
start reading the next batch.
-    Waiting(StreamReader<BufReader<File>>),
-
-    /// The stream has finished, successfully or not.
-    Done,
+    schema_validated: bool,
 }
 
+// Small margin allowed to accommodate slight memory accounting variation
+const SPILL_BATCH_MEMORY_MARGIN: usize = 4096;
+
 impl SpillReaderStream {
     fn new(
         schema: SchemaRef,
-        spill_file: RefCountedTempFile,
+        spill_file: Arc<dyn SpillFile>,
         max_record_batch_memory: Option<usize>,
-    ) -> Self {
-        Self {
+    ) -> Result<Self> {
+        let byte_stream = spill_file.read_stream()?;
+        // DataFusion controls what it writes so it can trust its own IPC 
output,
+        // matching the behavior of the previous StreamReader-based 
implementation.
+        let decoder = unsafe { StreamDecoder::new().with_skip_validation(true) 
};
+        Ok(Self {
             schema,
-            state: SpillReaderStreamState::Uninitialized(spill_file),
+            decoder,
+            byte_stream,
             max_record_batch_memory,
-        }
+            is_done: false,
+            current_buffer: Buffer::from(&[]),
+            _spill_file: spill_file,
+            schema_validated: false,
+        })
     }
+}
 
-    fn poll_next_inner(
-        &mut self,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Result<RecordBatch>>> {
-        match &mut self.state {
-            SpillReaderStreamState::Uninitialized(_) => {
-                // Temporarily replace with `Done` to be able to pass the file 
to the task.
-                let SpillReaderStreamState::Uninitialized(spill_file) =
-                    std::mem::replace(&mut self.state, 
SpillReaderStreamState::Done)
-                else {
-                    unreachable!()
-                };
-
-                let expected_schema = Arc::clone(&self.schema);
-                let task = SpawnedTask::spawn_blocking(move || {
-                    let file = BufReader::new(File::open(spill_file.path())?);
-                    // SAFETY: DataFusion's spill writer strictly follows 
Arrow IPC specifications
-                    // with validated schemas and buffers. Skip redundant 
validation during read
-                    // to speedup read operation. This is safe for DataFusion 
as input guaranteed to be correct when written.
-                    let mut reader = unsafe {
-                        StreamReader::try_new(file, 
None)?.with_skip_validation(true)
-                    };
-
-                    // Validate the schema read from Arrow IPC file is the 
same as the
-                    // schema of the current `SpillManager`
-                    let actual_schema = reader.schema();
-
-                    if actual_schema != expected_schema {
-                        return exec_err!(
-                            "Spill file schema mismatch: expected {}, got {}. \
-                            The caller must use the same SpillManager that 
created the spill file to read it.",
-                            expected_schema,
-                            actual_schema
-                        );
-                    }
-
-                    // TODO: Same-schema reads from a different SpillManager 
still pass today.
-                    // Add a SpillManager UID to IPC metadata and validate it 
here as well.
-                    let next_batch = reader.next().transpose()?;
-
-                    Ok((reader, next_batch))
-                });
+impl Stream for SpillReaderStream {
+    type Item = Result<RecordBatch>;
 
-                self.state = SpillReaderStreamState::ReadInProgress(task);
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        let this = self.get_mut();
 
-                // Poll again immediately so the inner task is polled and the 
waker is
-                // registered.
-                self.poll_next_inner(cx)
-            }
+        if this.is_done {
+            return Poll::Ready(None);
+        }
 
-            SpillReaderStreamState::ReadInProgress(task) => {
-                let result = futures::ready!(task.poll_unpin(cx))
-                    .unwrap_or_else(|err| 
Err(DataFusionError::External(Box::new(err))));
-
-                match result {
-                    Ok((reader, batch)) => {
-                        match batch {
-                            Some(batch) => {
-                                if let Some(max_record_batch_memory) =
-                                    self.max_record_batch_memory
-                                {
-                                    let actual_size =
-                                        get_record_batch_memory_size(&batch);
-                                    if actual_size
-                                        > max_record_batch_memory
-                                            + SPILL_BATCH_MEMORY_MARGIN
-                                    {
-                                        debug!(
-                                            "Record batch memory usage 
({actual_size} bytes) exceeds the expected limit ({max_record_batch_memory} 
bytes) \n\
-                                                by more than the allowed 
tolerance ({SPILL_BATCH_MEMORY_MARGIN} bytes).\n\
-                                                This likely indicates a bug in 
memory accounting during spilling.\n\
-                                                Please report this issue in 
https://github.com/apache/datafusion/issues/17340.";
-                                        );
-                                    }
-                                }
-                                self.state = 
SpillReaderStreamState::Waiting(reader);
-
-                                Poll::Ready(Some(Ok(batch)))
+        loop {
+            if !this.current_buffer.is_empty() {
+                match this.decoder.decode(&mut this.current_buffer) {
+                    Ok(Some(batch)) => {
+                        // One-time schema validation on the first decoded 
batch.
+                        // The IPC stream embeds the writer's schema in its 
header;
+                        // StreamDecoder surfaces it via the first batch's 
schema.
+                        // We check here rather than in new() because schema 
bytes
+                        // only arrive after decoding the IPC header from the 
stream.
+                        if !this.schema_validated {
+                            this.schema_validated = true;
+                            let actual = batch.schema();
+                            if actual != this.schema {
+                                this.is_done = true;
+                                return Poll::Ready(Some(Err(
+                                    datafusion_common::exec_datafusion_err!(
+                                        "Spill file schema mismatch: expected 
{}, got {}. \
+                     The caller must use the same SpillManager that created \
+                     the spill file to read it.",
+                                        this.schema,
+                                        actual
+                                    ),
+                                )));
                             }
-                            None => {
-                                // Stream is done
-                                self.state = SpillReaderStreamState::Done;
-
-                                Poll::Ready(None)
+                        }
+                        if let Some(max_record_batch_memory) =
+                            this.max_record_batch_memory
+                        {
+                            let actual_size = 
get_record_batch_memory_size(&batch);
+                            if actual_size
+                                > max_record_batch_memory + 
SPILL_BATCH_MEMORY_MARGIN
+                            {
+                                debug!(
+                                    "Record batch memory usage ({actual_size} 
bytes) exceeds the expected limit ({max_record_batch_memory} bytes) \n\
+                                        by more than the allowed tolerance 
({SPILL_BATCH_MEMORY_MARGIN} bytes).\n\
+                                        This likely indicates a bug in memory 
accounting during spilling."
+                                );
                             }
                         }
+                        return Poll::Ready(Some(Ok(batch)));
                     }
-                    Err(err) => {
-                        self.state = SpillReaderStreamState::Done;
-
-                        Poll::Ready(Some(Err(err)))
+                    Ok(None) => {
+                        // The chunk didn't form a complete message. Arrow 
consumed the partial bytes
+                        // into its internal scratch pad, leaving our 
current_buffer completely empty.
+                        // We do nothing and fall through to fetch more data.
+                    }
+                    Err(e) => {
+                        this.is_done = true;
+                        return Poll::Ready(Some(Err(e.into())));
                     }
                 }
             }
 
-            SpillReaderStreamState::Waiting(_) => {
-                // Temporarily replace with `Done` to be able to pass the file 
to the task.
-                let SpillReaderStreamState::Waiting(mut reader) =
-                    std::mem::replace(&mut self.state, 
SpillReaderStreamState::Done)
-                else {
-                    unreachable!()
-                };
-
-                let task = SpawnedTask::spawn_blocking(move || {
-                    let next_batch = reader.next().transpose()?;
-
-                    Ok((reader, next_batch))
-                });
-
-                self.state = SpillReaderStreamState::ReadInProgress(task);
+            match futures::ready!(this.byte_stream.as_mut().poll_next(cx)) {
+                Some(Ok(chunk)) => {
+                    this.current_buffer = Buffer::from(chunk);
+                }
+                Some(Err(e)) => {
+                    this.is_done = true;
+                    return Poll::Ready(Some(Err(e)));
+                }
+                None => {
+                    this.is_done = true;
 
-                // Poll again immediately so the inner task is polled and the 
waker is
-                // registered.
-                self.poll_next_inner(cx)
+                    if let Err(e) = this.decoder.finish() {
+                        return Poll::Ready(Some(Err(e.into())));
+                    }
+                    return Poll::Ready(None);
+                }
             }
-
-            SpillReaderStreamState::Done => Poll::Ready(None),
         }
     }
 }
 
-impl Stream for SpillReaderStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        self.get_mut().poll_next_inner(cx)
-    }
-}
-
 impl RecordBatchStream for SpillReaderStream {
     fn schema(&self) -> SchemaRef {
         Arc::clone(&self.schema)
     }
 }
 
+/// A simple, unmonitored file writer to support the deprecated 
`spill_record_batch_by_size` API.

Review Comment:
   what does "unmonitored" mean?
   



##########
datafusion/physical-plan/src/spill/mod.rs:
##########


Review Comment:
   Given this method has been deprecated since DataFusion 46 I think we can 
remove it (and make this PR simpler). 



##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -402,59 +452,7 @@ impl RefCountedTempFile {
         self.tempfile.as_ref()
     }
 
-    /// Updates the global disk usage counter after modifications to the 
underlying file.
-    ///
-    /// # Errors
-    /// - Returns an error if the global disk usage exceeds the configured 
limit.
-    pub fn update_disk_usage(&mut self) -> Result<()> {
-        // Get new file size from OS
-        let metadata = self.tempfile.as_file().metadata()?;
-        let new_disk_usage = metadata.len();
-
-        // Get the old disk usage

Review Comment:
   What happened to this code?



##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -89,19 +100,28 @@ impl DiskManagerBuilder {
                     max_temp_directory_size: self.max_temp_directory_size,
                     used_disk_space: Arc::new(AtomicU64::new(0)),
                     active_files_count: Arc::new(AtomicUsize::new(0)),
+                    custom_factory: None,
                 })
             }
             DiskManagerMode::Disabled => Ok(DiskManager {
                 local_dirs: Mutex::new(None),
                 max_temp_directory_size: self.max_temp_directory_size,
                 used_disk_space: Arc::new(AtomicU64::new(0)),
                 active_files_count: Arc::new(AtomicUsize::new(0)),
+                custom_factory: None,
+            }),
+            DiskManagerMode::Custom(factory) => Ok(DiskManager {
+                local_dirs: Mutex::new(None),
+                max_temp_directory_size: self.max_temp_directory_size,
+                used_disk_space: Arc::new(AtomicU64::new(0)),
+                active_files_count: Arc::new(AtomicUsize::new(0)),
+                custom_factory: Some(factory),
             }),
         }
     }
 }
 
-#[derive(Clone, Debug, Default)]
+#[derive(Clone, Default)]
 pub enum DiskManagerMode {

Review Comment:
   maybe this is not a super useful thing to do at the moment



##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -274,19 +258,73 @@ pub fn spill_record_batch_by_size(
     Ok(())
 }
 
-/// Write in Arrow IPC Stream format to a file.
-///
-/// Stream format is used for spill because it supports dictionary 
replacement, and the random
-/// access of IPC File format is not needed (IPC File format doesn't support 
dictionary replacement).
+/// An adapter that implements `std::io::Write` to bridge Arrow's synchronous

Review Comment:
   After further review it seems the adapter is needed because the arrow IPC 
writer is writing directly to `std::io::Write` which could be a File or 
something else that didn't require a second copy. 
   
   I actually think keeping std::io::write is the most flexible here (as the 
backend itself can decide to buffer into chunks and async write, etc). I will 
comment on the main API
   



##########
datafusion/execution/src/spill_file.rs:
##########
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use datafusion_common::Result;
+use futures::Stream;
+use std::path::Path;
+use std::pin::Pin;
+use std::sync::Arc;
+
+/// Abstraction over a spill file backend.
+/// Implementations handle their own quota enforcement and blocking concerns.
+pub trait SpillFile: Send + Sync {
+    /// Returns the OS path if this is a local file, None otherwise.
+    fn path(&self) -> Option<&Path> {
+        None
+    }
+
+    /// Returns current size in bytes if cheaply available.
+    fn size(&self) -> Option<u64>;
+
+    /// Returns file contents as an async stream of byte chunks.
+    fn read_stream(&self) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes>> + 
Send>>>;

Review Comment:
   similarly to the writer API comment below, it seems like this will require 
all backends to copy data  due to the fact that the Arrow StreamReader reads 
from a `std::io::Read`
   
   https://docs.rs/arrow-ipc/59.0.0/arrow_ipc/reader/struct.StreamReader.html
   
   Ideally the StreamReader and Writer could offer an `async` variant, but 
until that happens, it seems like it would be better to have this API return a 
read itself
   
   
   SOmething like
   ```rust
       fn read_stream(&self) -> Result<Box<std::io::REad>> + Send>>>;
   ```
   
   ## Alternate idea
   
   The other idea we could explore would be to make SpillFile in terms of 
RecordBatches (so have it be responsible itself for storing and retriveing 
streams of RecordBatches  (rather than streams of Bytes) to give the backend 
more flexibility in how it wanted to do IO
   
   The more I talk about that the better I like this idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to