This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a6a4df9996 Fix memory reservation starvation in sort-merge (#20642)
a6a4df9996 is described below

commit a6a4df99963e0a2c0c49f1c17d23c9c4e5b59c68
Author: xudong.w <[email protected]>
AuthorDate: Wed Mar 18 19:32:23 2026 +0800

    Fix memory reservation starvation in sort-merge (#20642)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #.
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    This PR fixes memory reservation starvation in sort-merge when multiple
    sort partitions share a GreedyMemoryPool.
    
    When multiple `ExternalSorter` instances run concurrently and share a
    single memory pool, the merge phase starves:
    
    1. Each partition pre-reserves sort_spill_reservation_bytes via
    merge_reservation
    2. When entering the merge phase, new_empty() was used to create a new
    reservation starting at 0 bytes, while the pre-reserved bytes sat idle
    in ExternalSorter.merge_reservation
    3. Those freed bytes were immediately consumed by other partitions
    racing for memory
    4. The merge could no longer allocate memory from the pool → OOM /
    starvation
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ~~I can't find a deterministic way to reproduce the bug, but it occurs
    in our production.~~ Add an end-to-end test to verify the fix
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 datafusion/physical-plan/src/sorts/builder.rs      |  54 ++++++-
 .../physical-plan/src/sorts/multi_level_merge.rs   |  48 +++++--
 datafusion/physical-plan/src/sorts/sort.rs         | 159 ++++++++++++++++++++-
 3 files changed, 239 insertions(+), 22 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/builder.rs 
b/datafusion/physical-plan/src/sorts/builder.rs
index 9b2fa96822..a462b83205 100644
--- a/datafusion/physical-plan/src/sorts/builder.rs
+++ b/datafusion/physical-plan/src/sorts/builder.rs
@@ -40,9 +40,24 @@ pub struct BatchBuilder {
     /// Maintain a list of [`RecordBatch`] and their corresponding stream
     batches: Vec<(usize, RecordBatch)>,
 
-    /// Accounts for memory used by buffered batches
+    /// Accounts for memory used by buffered batches.
+    ///
+    /// May include pre-reserved bytes (from `sort_spill_reservation_bytes`)
+    /// that were transferred via [`MemoryReservation::take()`] to prevent
+    /// starvation when concurrent sort partitions compete for pool memory.
     reservation: MemoryReservation,
 
+    /// Tracks the actual memory used by buffered batches (not including
+    /// pre-reserved bytes). This allows [`Self::push_batch`] to skip pool
+    /// allocation requests when the pre-reserved bytes cover the batch.
+    batches_mem_used: usize,
+
+    /// The initial reservation size at construction time. When the reservation
+    /// is pre-loaded with `sort_spill_reservation_bytes` (via `take()`), this
+    /// records that amount so we never shrink below it, maintaining the
+    /// anti-starvation guarantee throughout the merge.
+    initial_reservation: usize,
+
     /// The current [`BatchCursor`] for each stream
     cursors: Vec<BatchCursor>,
 
@@ -59,19 +74,26 @@ impl BatchBuilder {
         batch_size: usize,
         reservation: MemoryReservation,
     ) -> Self {
+        let initial_reservation = reservation.size();
         Self {
             schema,
             batches: Vec::with_capacity(stream_count * 2),
             cursors: vec![BatchCursor::default(); stream_count],
             indices: Vec::with_capacity(batch_size),
             reservation,
+            batches_mem_used: 0,
+            initial_reservation,
         }
     }
 
     /// Append a new batch in `stream_idx`
     pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> 
Result<()> {
-        self.reservation
-            .try_grow(get_record_batch_memory_size(&batch))?;
+        let size = get_record_batch_memory_size(&batch);
+        self.batches_mem_used += size;
+        // Only request additional memory from the pool when actual batch
+        // usage exceeds the current reservation (which may include
+        // pre-reserved bytes from sort_spill_reservation_bytes).
+        try_grow_reservation_to_at_least(&mut self.reservation, 
self.batches_mem_used)?;
         let batch_idx = self.batches.len();
         self.batches.push((stream_idx, batch));
         self.cursors[stream_idx] = BatchCursor {
@@ -143,14 +165,38 @@ impl BatchBuilder {
                 stream_cursor.batch_idx = retained;
                 retained += 1;
             } else {
-                self.reservation.shrink(get_record_batch_memory_size(batch));
+                self.batches_mem_used -= get_record_batch_memory_size(batch);
             }
             retain
         });
 
+        // Release excess memory back to the pool, but never shrink below
+        // initial_reservation to maintain the anti-starvation guarantee
+        // for the merge phase.
+        let target = self.batches_mem_used.max(self.initial_reservation);
+        if self.reservation.size() > target {
+            self.reservation.shrink(self.reservation.size() - target);
+        }
+
         Ok(Some(RecordBatch::try_new(
             Arc::clone(&self.schema),
             columns,
         )?))
     }
 }
+
+/// Try to grow `reservation` so it covers at least `needed` bytes.
+///
+/// When a reservation has been pre-loaded with bytes (e.g. via
+/// [`MemoryReservation::take()`]), this avoids redundant pool
+/// allocations: if the reservation already covers `needed`, this is
+/// a no-op; otherwise only the deficit is requested from the pool.
+pub(crate) fn try_grow_reservation_to_at_least(
+    reservation: &mut MemoryReservation,
+    needed: usize,
+) -> Result<()> {
+    if needed > reservation.size() {
+        reservation.try_grow(needed - reservation.size())?;
+    }
+    Ok(())
+}
diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs 
b/datafusion/physical-plan/src/sorts/multi_level_merge.rs
index 2e0d668a29..8985e1d8c7 100644
--- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs
+++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs
@@ -30,6 +30,7 @@ use arrow::datatypes::SchemaRef;
 use datafusion_common::Result;
 use datafusion_execution::memory_pool::MemoryReservation;
 
+use crate::sorts::builder::try_grow_reservation_to_at_least;
 use crate::sorts::sort::get_reserved_bytes_for_record_batch_size;
 use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
 use crate::stream::RecordBatchStreamAdapter;
@@ -253,7 +254,12 @@ impl MultiLevelMergeBuilder {
 
             // Need to merge multiple streams
             (_, _) => {
-                let mut memory_reservation = self.reservation.new_empty();
+                // Transfer any pre-reserved bytes (from 
sort_spill_reservation_bytes)
+                // to the merge memory reservation. This prevents starvation 
when
+                // concurrent sort partitions compete for pool memory: the 
pre-reserved
+                // bytes cover spill file buffer reservations without 
additional pool
+                // allocation.
+                let mut memory_reservation = self.reservation.take();
 
                 // Don't account for existing streams memory
                 // as we are not holding the memory for them
@@ -269,6 +275,15 @@ impl MultiLevelMergeBuilder {
 
                 let is_only_merging_memory_streams = 
sorted_spill_files.is_empty();
 
+                // If no spill files were selected (e.g. all too large for
+                // available memory but enough in-memory streams exist),
+                // return the pre-reserved bytes to self.reservation so
+                // create_new_merge_sort can transfer them to the merge
+                // stream's BatchBuilder.
+                if is_only_merging_memory_streams {
+                    mem::swap(&mut self.reservation, &mut memory_reservation);
+                }
+
                 for spill in sorted_spill_files {
                     let stream = self
                         .spill_manager
@@ -337,8 +352,10 @@ impl MultiLevelMergeBuilder {
             builder = builder.with_bypass_mempool();
         } else {
             // If we are only merging in-memory streams, we need to use the 
memory reservation
-            // because we don't know the maximum size of the batches in the 
streams
-            builder = builder.with_reservation(self.reservation.new_empty());
+            // because we don't know the maximum size of the batches in the 
streams.
+            // Use take() to transfer any pre-reserved bytes so the merge can 
use them
+            // as its initial budget without additional pool allocation.
+            builder = builder.with_reservation(self.reservation.take());
         }
 
         builder.build()
@@ -356,17 +373,24 @@ impl MultiLevelMergeBuilder {
     ) -> Result<(Vec<SortedSpillFile>, usize)> {
         assert_ne!(buffer_len, 0, "Buffer length must be greater than 0");
         let mut number_of_spills_to_read_for_current_phase = 0;
+        // Track total memory needed for spill file buffers. When the
+        // reservation has pre-reserved bytes (from 
sort_spill_reservation_bytes),
+        // those bytes cover the first N spill files without additional pool
+        // allocation, preventing starvation under memory pressure.
+        let mut total_needed: usize = 0;
 
         for spill in &self.sorted_spill_files {
-            // For memory pools that are not shared this is good, for other 
this is not
-            // and there should be some upper limit to memory reservation so 
we won't starve the system
-            match reservation.try_grow(
-                get_reserved_bytes_for_record_batch_size(
-                    spill.max_record_batch_memory,
-                    // Size will be the same as the sliced size, bc it is a 
spilled batch.
-                    spill.max_record_batch_memory,
-                ) * buffer_len,
-            ) {
+            let per_spill = get_reserved_bytes_for_record_batch_size(
+                spill.max_record_batch_memory,
+                // Size will be the same as the sliced size, bc it is a 
spilled batch.
+                spill.max_record_batch_memory,
+            ) * buffer_len;
+            total_needed += per_spill;
+
+            // For memory pools that are not shared this is good, for other
+            // this is not and there should be some upper limit to memory
+            // reservation so we won't starve the system.
+            match try_grow_reservation_to_at_least(reservation, total_needed) {
                 Ok(_) => {
                     number_of_spills_to_read_for_current_phase += 1;
                 }
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 2c5c82e723..da2171847c 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -342,11 +342,6 @@ impl ExternalSorter {
     /// 2. A combined streaming merge incorporating both in-memory
     ///    batches and data from spill files on disk.
     async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
-        // Release the memory reserved for merge back to the pool so
-        // there is some left when `in_mem_sort_stream` requests an
-        // allocation.
-        self.merge_reservation.free();
-
         if self.spilled_before() {
             // Sort `in_mem_batches` and spill it first. If there are many
             // `in_mem_batches` and the memory limit is almost reached, merging
@@ -355,6 +350,13 @@ impl ExternalSorter {
                 self.sort_and_spill_in_mem_batches().await?;
             }
 
+            // Transfer the pre-reserved merge memory to the streaming merge
+            // using `take()` instead of `new_empty()`. This ensures the merge
+            // stream starts with `sort_spill_reservation_bytes` already
+            // allocated, preventing starvation when concurrent sort partitions
+            // compete for pool memory. `take()` moves the bytes atomically
+            // without releasing them back to the pool, so other partitions
+            // cannot race to consume the freed memory.
             StreamingMergeBuilder::new()
                 .with_sorted_spill_files(std::mem::take(&mut 
self.finished_spill_files))
                 .with_spill_manager(self.spill_manager.clone())
@@ -363,9 +365,14 @@ impl ExternalSorter {
                 .with_metrics(self.metrics.baseline.clone())
                 .with_batch_size(self.batch_size)
                 .with_fetch(None)
-                .with_reservation(self.merge_reservation.new_empty())
+                .with_reservation(self.merge_reservation.take())
                 .build()
         } else {
+            // Release the memory reserved for merge back to the pool so
+            // there is some left when `in_mem_sort_stream` requests an
+            // allocation. Only needed for the non-spill path; the spill
+            // path transfers the reservation to the merge stream instead.
+            self.merge_reservation.free();
             self.in_mem_sort_stream(self.metrics.baseline.clone())
         }
     }
@@ -375,6 +382,12 @@ impl ExternalSorter {
         self.reservation.size()
     }
 
+    /// How much memory is reserved for the merge phase?
+    #[cfg(test)]
+    fn merge_reservation_size(&self) -> usize {
+        self.merge_reservation.size()
+    }
+
     /// How many bytes have been spilled to disk?
     fn spilled_bytes(&self) -> usize {
         self.metrics.spill_metrics.spilled_bytes.value()
@@ -2716,4 +2729,138 @@ mod tests {
 
         Ok(())
     }
+
+    /// Verifies that `ExternalSorter::sort()` transfers the pre-reserved
+    /// merge bytes to the merge stream via `take()`, rather than leaving
+    /// them in the sorter (via `new_empty()`).
+    ///
+    /// 1. Create a sorter with a tight memory pool and insert enough data
+    ///    to force spilling
+    /// 2. Verify `merge_reservation` holds the pre-reserved bytes before sort
+    /// 3. Call `sort()` to get the merge stream
+    /// 4. Verify `merge_reservation` is now 0 (bytes transferred to merge 
stream)
+    /// 5. Simulate contention: a competing consumer grabs all available pool 
memory
+    /// 6. Verify the merge stream still works (it uses its pre-reserved bytes
+    ///    as initial budget, not requesting from pool starting at 0)
+    ///
+    /// With `new_empty()` (before fix), step 4 fails: `merge_reservation`
+    /// still holds the bytes, the merge stream starts with 0 budget, and
+    /// those bytes become unaccounted-for reserved memory that nobody uses.
+    #[tokio::test]
+    async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> 
{
+        use datafusion_execution::memory_pool::{
+            GreedyMemoryPool, MemoryConsumer, MemoryPool,
+        };
+        use futures::TryStreamExt;
+
+        let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB
+
+        // Pool: merge reservation (10KB) + enough room for sort to work.
+        // The room must accommodate batch data accumulation before spilling.
+        let sort_working_memory: usize = 40 * 1024; // 40 KB for sort 
operations
+        let pool_size = sort_spill_reservation_bytes + sort_working_memory;
+        let pool: Arc<dyn MemoryPool> = 
Arc::new(GreedyMemoryPool::new(pool_size));
+
+        let runtime = RuntimeEnvBuilder::new()
+            .with_memory_pool(Arc::clone(&pool))
+            .build_arc()?;
+
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let schema = Arc::new(Schema::new(vec![Field::new("x", 
DataType::Int32, false)]));
+
+        let mut sorter = ExternalSorter::new(
+            0,
+            Arc::clone(&schema),
+            [PhysicalSortExpr::new_default(Arc::new(Column::new("x", 
0)))].into(),
+            128, // batch_size
+            sort_spill_reservation_bytes,
+            usize::MAX, // sort_in_place_threshold_bytes (high to avoid concat 
path)
+            SpillCompression::Uncompressed,
+            &metrics_set,
+            Arc::clone(&runtime),
+        )?;
+
+        // Insert enough data to force spilling.
+        let num_batches = 200;
+        for i in 0..num_batches {
+            let values: Vec<i32> = ((i * 100)..((i + 1) * 
100)).rev().collect();
+            let batch = RecordBatch::try_new(
+                Arc::clone(&schema),
+                vec![Arc::new(Int32Array::from(values))],
+            )?;
+            sorter.insert_batch(batch).await?;
+        }
+
+        assert!(
+            sorter.spilled_before(),
+            "Test requires spilling to exercise the merge path"
+        );
+
+        // Before sort(), merge_reservation holds sort_spill_reservation_bytes.
+        assert!(
+            sorter.merge_reservation_size() >= sort_spill_reservation_bytes,
+            "merge_reservation should hold the pre-reserved bytes before 
sort()"
+        );
+
+        // Call sort() to get the merge stream. With the fix (take()),
+        // the pre-reserved merge bytes are transferred to the merge
+        // stream. Without the fix (free() + new_empty()), the bytes
+        // are released back to the pool and the merge stream starts
+        // with 0 bytes.
+        let merge_stream = sorter.sort().await?;
+
+        // THE KEY ASSERTION: after sort(), merge_reservation must be 0.
+        // This proves take() transferred the bytes to the merge stream,
+        // rather than them being freed back to the pool where other
+        // partitions could steal them.
+        assert_eq!(
+            sorter.merge_reservation_size(),
+            0,
+            "After sort(), merge_reservation should be 0 (bytes transferred \
+             to merge stream via take()). If non-zero, the bytes are still \
+             held by the sorter and will be freed on drop, allowing other \
+             partitions to steal them."
+        );
+
+        // Drop the sorter to free its reservations back to the pool.
+        drop(sorter);
+
+        // Simulate contention: another partition grabs ALL available
+        // pool memory. If the merge stream didn't receive the
+        // pre-reserved bytes via take(), it will fail when it tries
+        // to allocate memory for reading spill files.
+        let contender = 
MemoryConsumer::new("CompetingPartition").register(&pool);
+        let available = pool_size.saturating_sub(pool.reserved());
+        if available > 0 {
+            contender.try_grow(available).unwrap();
+        }
+
+        // The merge stream must still produce correct results despite
+        // the pool being fully consumed by the contender. This only
+        // works if sort() transferred the pre-reserved bytes to the
+        // merge stream (via take()) rather than freeing them.
+        let batches: Vec<RecordBatch> = merge_stream.try_collect().await?;
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(
+            total_rows,
+            (num_batches * 100) as usize,
+            "Merge stream should produce all rows even under memory contention"
+        );
+
+        // Verify data is sorted
+        let merged = concat_batches(&schema, &batches)?;
+        let col = merged.column(0).as_primitive::<Int32Type>();
+        for i in 1..col.len() {
+            assert!(
+                col.value(i - 1) <= col.value(i),
+                "Output should be sorted, but found {} > {} at index {}",
+                col.value(i - 1),
+                col.value(i),
+                i
+            );
+        }
+
+        drop(contender);
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to