pantShrey commented on code in PR #21882:
URL: https://github.com/apache/datafusion/pull/21882#discussion_r3443798268
##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -39,212 +39,187 @@ use arrow::array::{
Array, ArrayRef, BinaryViewArray, BufferSpec, GenericByteViewArray,
StringViewArray,
layout, make_array,
};
+use arrow::buffer::Buffer;
use arrow::datatypes::DataType;
use arrow::datatypes::{ByteViewType, Schema, SchemaRef};
use arrow::ipc::{
MetadataVersion,
- reader::StreamReader,
+ reader::StreamDecoder,
writer::{IpcWriteOptions, StreamWriter},
};
use arrow::record_batch::RecordBatch;
use arrow_data::ArrayDataBuilder;
use arrow_ipc::CompressionType;
use datafusion_common::config::SpillCompression;
-use datafusion_common::{DataFusionError, Result, exec_datafusion_err,
exec_err};
-use datafusion_common_runtime::SpawnedTask;
+use datafusion_common::{DataFusionError, Result, exec_datafusion_err};
use datafusion_execution::RecordBatchStream;
-use datafusion_execution::disk_manager::RefCountedTempFile;
-use futures::{FutureExt as _, Stream};
+use datafusion_execution::spill_file::SpillFile;
+use futures::Stream;
use log::debug;
-/// Stream that reads spill files from disk where each batch is read in a
spawned blocking task
-/// It will read one batch at a time and will not do any buffering, to buffer
data use [`crate::common::spawn_buffered`]
-///
-/// A simpler solution would be spawning a long-running blocking task for each
-/// file read (instead of each batch). This approach does not work because when
-/// the number of concurrent reads exceeds the Tokio thread pool limit,
-/// deadlocks can occur and block progress.
+/// Stream that reads spill files from a [`SpillFile`] backend as a stream of
[`RecordBatch`]es.
+/// Uses [`StreamDecoder`] to decode IPC bytes received from the backend's
async byte stream.
+/// Backends handle their own threading concerns internally - OS files use
+/// `tokio::fs::File` which performs blocking IO per-syscall without holding a
thread
+/// for the file's lifetime, avoiding deadlocks when concurrent reads exceed
thread pool limits.
struct SpillReaderStream {
schema: SchemaRef,
- state: SpillReaderStreamState,
+ decoder: StreamDecoder,
+ byte_stream: Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send>>,
+ is_done: bool,
+
/// Maximum memory size observed among spilling sorted record batches.
/// This is used for validation purposes during reading each RecordBatch
from spill.
/// For context on why this value is recorded and validated,
/// see `physical_plan/sort/multi_level_merge.rs`.
max_record_batch_memory: Option<usize>,
-}
-// Small margin allowed to accommodate slight memory accounting variation
-const SPILL_BATCH_MEMORY_MARGIN: usize = 4096;
+ /// Holds leftover bytes from a chunk when a batch is yielded early
+ current_buffer: Buffer,
-/// When we poll for the next batch, we will get back both the batch and the
reader,
-/// so we can call `next` again.
-type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>,
Option<RecordBatch>)>;
+ /// Keeps the file alive until the stream is dropped
+ _spill_file: Arc<dyn SpillFile>,
-enum SpillReaderStreamState {
- /// Initial state: the stream was not initialized yet
- /// and the file was not opened
- Uninitialized(RefCountedTempFile),
-
- /// A read is in progress in a spawned blocking task for which we hold the
handle.
- ReadInProgress(SpawnedTask<NextRecordBatchResult>),
-
- /// A read has finished and we wait for being polled again in order to
start reading the next batch.
- Waiting(StreamReader<BufReader<File>>),
-
- /// The stream has finished, successfully or not.
- Done,
+ schema_validated: bool,
}
+// Small margin allowed to accommodate slight memory accounting variation
+const SPILL_BATCH_MEMORY_MARGIN: usize = 4096;
+
impl SpillReaderStream {
fn new(
schema: SchemaRef,
- spill_file: RefCountedTempFile,
+ spill_file: Arc<dyn SpillFile>,
max_record_batch_memory: Option<usize>,
- ) -> Self {
- Self {
+ ) -> Result<Self> {
+ let byte_stream = spill_file.read_stream()?;
+ // DataFusion controls what it writes so it can trust its own IPC
output,
+ // matching the behavior of the previous StreamReader-based
implementation.
+ let decoder = unsafe { StreamDecoder::new().with_skip_validation(true)
};
+ Ok(Self {
schema,
- state: SpillReaderStreamState::Uninitialized(spill_file),
+ decoder,
+ byte_stream,
max_record_batch_memory,
- }
+ is_done: false,
+ current_buffer: Buffer::from(&[]),
+ _spill_file: spill_file,
+ schema_validated: false,
+ })
}
+}
- fn poll_next_inner(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Result<RecordBatch>>> {
- match &mut self.state {
- SpillReaderStreamState::Uninitialized(_) => {
- // Temporarily replace with `Done` to be able to pass the file
to the task.
- let SpillReaderStreamState::Uninitialized(spill_file) =
- std::mem::replace(&mut self.state,
SpillReaderStreamState::Done)
- else {
- unreachable!()
- };
-
- let expected_schema = Arc::clone(&self.schema);
- let task = SpawnedTask::spawn_blocking(move || {
- let file = BufReader::new(File::open(spill_file.path())?);
- // SAFETY: DataFusion's spill writer strictly follows
Arrow IPC specifications
- // with validated schemas and buffers. Skip redundant
validation during read
- // to speedup read operation. This is safe for DataFusion
as input guaranteed to be correct when written.
- let mut reader = unsafe {
- StreamReader::try_new(file,
None)?.with_skip_validation(true)
- };
-
- // Validate the schema read from Arrow IPC file is the
same as the
- // schema of the current `SpillManager`
- let actual_schema = reader.schema();
-
- if actual_schema != expected_schema {
- return exec_err!(
- "Spill file schema mismatch: expected {}, got {}. \
- The caller must use the same SpillManager that
created the spill file to read it.",
- expected_schema,
- actual_schema
- );
- }
-
- // TODO: Same-schema reads from a different SpillManager
still pass today.
- // Add a SpillManager UID to IPC metadata and validate it
here as well.
- let next_batch = reader.next().transpose()?;
-
- Ok((reader, next_batch))
- });
+impl Stream for SpillReaderStream {
+ type Item = Result<RecordBatch>;
- self.state = SpillReaderStreamState::ReadInProgress(task);
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ let this = self.get_mut();
- // Poll again immediately so the inner task is polled and the
waker is
- // registered.
- self.poll_next_inner(cx)
- }
+ if this.is_done {
+ return Poll::Ready(None);
+ }
- SpillReaderStreamState::ReadInProgress(task) => {
- let result = futures::ready!(task.poll_unpin(cx))
- .unwrap_or_else(|err|
Err(DataFusionError::External(Box::new(err))));
-
- match result {
- Ok((reader, batch)) => {
- match batch {
- Some(batch) => {
- if let Some(max_record_batch_memory) =
- self.max_record_batch_memory
- {
- let actual_size =
- get_record_batch_memory_size(&batch);
- if actual_size
- > max_record_batch_memory
- + SPILL_BATCH_MEMORY_MARGIN
- {
- debug!(
- "Record batch memory usage
({actual_size} bytes) exceeds the expected limit ({max_record_batch_memory}
bytes) \n\
- by more than the allowed
tolerance ({SPILL_BATCH_MEMORY_MARGIN} bytes).\n\
- This likely indicates a bug in
memory accounting during spilling.\n\
- Please report this issue in
https://github.com/apache/datafusion/issues/17340."
- );
- }
- }
- self.state =
SpillReaderStreamState::Waiting(reader);
-
- Poll::Ready(Some(Ok(batch)))
+ loop {
+ if !this.current_buffer.is_empty() {
+ match this.decoder.decode(&mut this.current_buffer) {
+ Ok(Some(batch)) => {
+ // One-time schema validation on the first decoded
batch.
+ // The IPC stream embeds the writer's schema in its
header;
+ // StreamDecoder surfaces it via the first batch's
schema.
+ // We check here rather than in new() because schema
bytes
+ // only arrive after decoding the IPC header from the
stream.
+ if !this.schema_validated {
+ this.schema_validated = true;
+ let actual = batch.schema();
+ if actual != this.schema {
+ this.is_done = true;
+ return Poll::Ready(Some(Err(
+ datafusion_common::exec_datafusion_err!(
+ "Spill file schema mismatch: expected
{}, got {}. \
+ The caller must use the same SpillManager that created \
+ the spill file to read it.",
+ this.schema,
+ actual
+ ),
+ )));
}
- None => {
- // Stream is done
- self.state = SpillReaderStreamState::Done;
-
- Poll::Ready(None)
+ }
+ if let Some(max_record_batch_memory) =
+ this.max_record_batch_memory
+ {
+ let actual_size =
get_record_batch_memory_size(&batch);
+ if actual_size
+ > max_record_batch_memory +
SPILL_BATCH_MEMORY_MARGIN
+ {
+ debug!(
+ "Record batch memory usage ({actual_size}
bytes) exceeds the expected limit ({max_record_batch_memory} bytes) \n\
+ by more than the allowed tolerance
({SPILL_BATCH_MEMORY_MARGIN} bytes).\n\
+ This likely indicates a bug in memory
accounting during spilling."
+ );
}
}
+ return Poll::Ready(Some(Ok(batch)));
}
- Err(err) => {
- self.state = SpillReaderStreamState::Done;
-
- Poll::Ready(Some(Err(err)))
+ Ok(None) => {
+ // The chunk didn't form a complete message. Arrow
consumed the partial bytes
+ // into its internal scratch pad, leaving our
current_buffer completely empty.
+ // We do nothing and fall through to fetch more data.
+ }
+ Err(e) => {
+ this.is_done = true;
+ return Poll::Ready(Some(Err(e.into())));
}
}
}
- SpillReaderStreamState::Waiting(_) => {
- // Temporarily replace with `Done` to be able to pass the file
to the task.
- let SpillReaderStreamState::Waiting(mut reader) =
- std::mem::replace(&mut self.state,
SpillReaderStreamState::Done)
- else {
- unreachable!()
- };
-
- let task = SpawnedTask::spawn_blocking(move || {
- let next_batch = reader.next().transpose()?;
-
- Ok((reader, next_batch))
- });
-
- self.state = SpillReaderStreamState::ReadInProgress(task);
+ match futures::ready!(this.byte_stream.as_mut().poll_next(cx)) {
+ Some(Ok(chunk)) => {
+ this.current_buffer = Buffer::from(chunk);
+ }
+ Some(Err(e)) => {
+ this.is_done = true;
+ return Poll::Ready(Some(Err(e)));
+ }
+ None => {
+ this.is_done = true;
- // Poll again immediately so the inner task is polled and the
waker is
- // registered.
- self.poll_next_inner(cx)
+ if let Err(e) = this.decoder.finish() {
+ return Poll::Ready(Some(Err(e.into())));
+ }
+ return Poll::Ready(None);
+ }
}
-
- SpillReaderStreamState::Done => Poll::Ready(None),
}
}
}
-impl Stream for SpillReaderStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
- self.get_mut().poll_next_inner(cx)
- }
-}
-
impl RecordBatchStream for SpillReaderStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
+/// A simple, unmonitored file writer to support the deprecated
`spill_record_batch_by_size` API.
Review Comment:
I just meant that it bypassed the DiskManager metrics tracking (it didn't
update used_disk_space). However, since you suggested removing this deprecated
code entirely below, I will delete it entirely
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]