gratus00 commented on code in PR #21629:
URL: https://github.com/apache/datafusion/pull/21629#discussion_r3097558112


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -402,381 +496,199 @@ impl ExternalSorter {
         self.metrics.spill_metrics.spill_file_count.value()
     }
 
-    /// Appending globally sorted batches to the in-progress spill file, and 
clears
-    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
-    async fn consume_and_spill_append(
-        &mut self,
-        globally_sorted_batches: &mut Vec<RecordBatch>,
-    ) -> Result<()> {
-        if globally_sorted_batches.is_empty() {
-            return Ok(());
-        }
-
-        // Lazily initialize the in-progress spill file
-        if self.in_progress_spill_file.is_none() {
-            self.in_progress_spill_file =
-                Some((self.spill_manager.create_in_progress_file("Sorting")?, 
0));
-        }
-
-        Self::organize_stringview_arrays(globally_sorted_batches)?;
-
-        debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
-
-        let batches_to_spill = std::mem::take(globally_sorted_batches);
-        self.reservation.free();
-
-        let (in_progress_file, max_record_batch_size) =
-            self.in_progress_spill_file.as_mut().ok_or_else(|| {
-                internal_datafusion_err!("In-progress spill file should be 
initialized")
-            })?;
-
-        for batch in batches_to_spill {
-            in_progress_file.append_batch(&batch)?;
-
-            *max_record_batch_size =
-                (*max_record_batch_size).max(batch.get_sliced_size()?);
-        }
-
-        assert_or_internal_err!(
-            globally_sorted_batches.is_empty(),
-            "This function consumes globally_sorted_batches, so it should be 
empty after taking."
-        );
-
-        Ok(())
-    }
-
-    /// Finishes the in-progress spill file and moves it to the finished spill 
files.
-    async fn spill_finish(&mut self) -> Result<()> {
-        let (mut in_progress_file, max_record_batch_memory) =
-            self.in_progress_spill_file.take().ok_or_else(|| {
-                internal_datafusion_err!("Should be called after 
`spill_append`")
-            })?;
-        let spill_file = in_progress_file.finish()?;
-
-        if let Some(spill_file) = spill_file {
-            self.finished_spill_files.push(SortedSpillFile {
-                file: spill_file,
-                max_record_batch_memory,
-            });
-        }
-
-        Ok(())
-    }
-
-    /// Reconstruct `globally_sorted_batches` to organize the payload buffers 
of each
-    /// `StringViewArray` in sequential order by calling `gc()` on them.
-    ///
-    /// Note this is a workaround until 
<https://github.com/apache/arrow-rs/issues/7185> is
-    /// available
-    ///
-    /// # Rationale
-    /// After (merge-based) sorting, all batches will be sorted into a single 
run,
-    /// but physically this sorted run is chunked into many small batches. For
-    /// `StringViewArray`s inside each sorted run, their inner buffers are not
-    /// re-constructed by default, leading to non-sequential payload locations
-    /// (permutated by `interleave()` Arrow kernel). A single payload buffer 
might
-    /// be shared by multiple `RecordBatch`es.
-    /// When writing each batch to disk, the writer has to write all 
referenced buffers,
-    /// because they have to be read back one by one to reduce memory usage. 
This
-    /// causes extra disk reads and writes, and potentially execution failure.
+    /// Spills sorted runs to disk.
     ///
-    /// # Example
-    /// Before sorting:
-    /// batch1 -> buffer1
-    /// batch2 -> buffer2
+    /// Two strategies depending on available merge headroom:
     ///
-    /// sorted_batch1 -> buffer1
-    ///               -> buffer2
-    /// sorted_batch2 -> buffer1
-    ///               -> buffer2
+    /// - **With headroom** (`merge_reservation > 0`): merge all runs into
+    ///   a single globally-sorted stream, then write to one spill file.
+    ///   Fewer spill files = lower fan-in for the final MultiLevelMerge.
     ///
-    /// Then when spilling each batch, the writer has to write all referenced 
buffers
-    /// repeatedly.
-    fn organize_stringview_arrays(
-        globally_sorted_batches: &mut Vec<RecordBatch>,
-    ) -> Result<()> {
-        let mut organized_batches = 
Vec::with_capacity(globally_sorted_batches.len());
-
-        for batch in globally_sorted_batches.drain(..) {
-            let mut new_columns: Vec<Arc<dyn Array>> =
-                Vec::with_capacity(batch.num_columns());
+    /// - **Without headroom** (`merge_reservation == 0`): spill each run
+    ///   as its own file. Avoids allocating merge cursor infrastructure
+    ///   when the pool has no room. MultiLevelMerge handles the higher
+    ///   fan-in with dynamic memory management.
+    async fn spill_sorted_runs(&mut self) -> Result<()> {
+        assert_or_internal_err!(
+            self.has_sorted_runs(),
+            "sorted_runs must not be empty when attempting to spill"
+        );
 
-            let mut arr_mutated = false;
-            for array in batch.columns() {
-                if let Some(string_view_array) =
-                    array.as_any().downcast_ref::<StringViewArray>()
-                {
-                    let new_array = string_view_array.gc();
-                    new_columns.push(Arc::new(new_array));
-                    arr_mutated = true;
-                } else {
-                    new_columns.push(Arc::clone(array));
-                }
-            }
+        if self.merge_reservation.size() > 0 && self.sorted_runs.len() > 1 {

Review Comment:
   Hi, i'm tracing the spill cycles, and the first spill's merge_reservation 
value starts at 0 while others grow at the end of `spill_sorted_runs`
   
   the second spill could be something small like 1024 and still take the merge 
path
   
   is `>0` just a good enough heuristic we are using without trying to 
guarantee actual headroom for the merge process?
   
   
   feel free to correct me, i'm new and i'm trying to learn!!



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -402,381 +496,199 @@ impl ExternalSorter {
         self.metrics.spill_metrics.spill_file_count.value()
     }
 
-    /// Appending globally sorted batches to the in-progress spill file, and 
clears
-    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
-    async fn consume_and_spill_append(
-        &mut self,
-        globally_sorted_batches: &mut Vec<RecordBatch>,
-    ) -> Result<()> {
-        if globally_sorted_batches.is_empty() {
-            return Ok(());
-        }
-
-        // Lazily initialize the in-progress spill file
-        if self.in_progress_spill_file.is_none() {
-            self.in_progress_spill_file =
-                Some((self.spill_manager.create_in_progress_file("Sorting")?, 
0));
-        }
-
-        Self::organize_stringview_arrays(globally_sorted_batches)?;
-
-        debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
-
-        let batches_to_spill = std::mem::take(globally_sorted_batches);
-        self.reservation.free();
-
-        let (in_progress_file, max_record_batch_size) =
-            self.in_progress_spill_file.as_mut().ok_or_else(|| {
-                internal_datafusion_err!("In-progress spill file should be 
initialized")
-            })?;
-
-        for batch in batches_to_spill {
-            in_progress_file.append_batch(&batch)?;
-
-            *max_record_batch_size =
-                (*max_record_batch_size).max(batch.get_sliced_size()?);
-        }
-
-        assert_or_internal_err!(
-            globally_sorted_batches.is_empty(),
-            "This function consumes globally_sorted_batches, so it should be 
empty after taking."
-        );
-
-        Ok(())
-    }
-
-    /// Finishes the in-progress spill file and moves it to the finished spill 
files.
-    async fn spill_finish(&mut self) -> Result<()> {
-        let (mut in_progress_file, max_record_batch_memory) =
-            self.in_progress_spill_file.take().ok_or_else(|| {
-                internal_datafusion_err!("Should be called after 
`spill_append`")
-            })?;
-        let spill_file = in_progress_file.finish()?;
-
-        if let Some(spill_file) = spill_file {
-            self.finished_spill_files.push(SortedSpillFile {
-                file: spill_file,
-                max_record_batch_memory,
-            });
-        }
-
-        Ok(())
-    }
-
-    /// Reconstruct `globally_sorted_batches` to organize the payload buffers 
of each
-    /// `StringViewArray` in sequential order by calling `gc()` on them.
-    ///
-    /// Note this is a workaround until 
<https://github.com/apache/arrow-rs/issues/7185> is
-    /// available
-    ///
-    /// # Rationale
-    /// After (merge-based) sorting, all batches will be sorted into a single 
run,
-    /// but physically this sorted run is chunked into many small batches. For
-    /// `StringViewArray`s inside each sorted run, their inner buffers are not
-    /// re-constructed by default, leading to non-sequential payload locations
-    /// (permutated by `interleave()` Arrow kernel). A single payload buffer 
might
-    /// be shared by multiple `RecordBatch`es.
-    /// When writing each batch to disk, the writer has to write all 
referenced buffers,
-    /// because they have to be read back one by one to reduce memory usage. 
This
-    /// causes extra disk reads and writes, and potentially execution failure.
+    /// Spills sorted runs to disk.
     ///
-    /// # Example
-    /// Before sorting:
-    /// batch1 -> buffer1
-    /// batch2 -> buffer2
+    /// Two strategies depending on available merge headroom:
     ///
-    /// sorted_batch1 -> buffer1
-    ///               -> buffer2
-    /// sorted_batch2 -> buffer1
-    ///               -> buffer2
+    /// - **With headroom** (`merge_reservation > 0`): merge all runs into
+    ///   a single globally-sorted stream, then write to one spill file.
+    ///   Fewer spill files = lower fan-in for the final MultiLevelMerge.
     ///
-    /// Then when spilling each batch, the writer has to write all referenced 
buffers
-    /// repeatedly.
-    fn organize_stringview_arrays(
-        globally_sorted_batches: &mut Vec<RecordBatch>,
-    ) -> Result<()> {
-        let mut organized_batches = 
Vec::with_capacity(globally_sorted_batches.len());
-
-        for batch in globally_sorted_batches.drain(..) {
-            let mut new_columns: Vec<Arc<dyn Array>> =
-                Vec::with_capacity(batch.num_columns());
+    /// - **Without headroom** (`merge_reservation == 0`): spill each run
+    ///   as its own file. Avoids allocating merge cursor infrastructure
+    ///   when the pool has no room. MultiLevelMerge handles the higher
+    ///   fan-in with dynamic memory management.
+    async fn spill_sorted_runs(&mut self) -> Result<()> {
+        assert_or_internal_err!(
+            self.has_sorted_runs(),
+            "sorted_runs must not be empty when attempting to spill"
+        );
 
-            let mut arr_mutated = false;
-            for array in batch.columns() {
-                if let Some(string_view_array) =
-                    array.as_any().downcast_ref::<StringViewArray>()
-                {
-                    let new_array = string_view_array.gc();
-                    new_columns.push(Arc::new(new_array));
-                    arr_mutated = true;
-                } else {
-                    new_columns.push(Arc::clone(array));
-                }
-            }
+        if self.merge_reservation.size() > 0 && self.sorted_runs.len() > 1 {
+            debug!(
+                "Spilling {} sorted runs via merge to single file",
+                self.sorted_runs.len()
+            );
+            // Free merge_reservation to provide pool headroom for the
+            // merge cursor allocation. Re-reserved at the end.
+            self.merge_reservation.free();
 
-            let organized_batch = if arr_mutated {
-                RecordBatch::try_new(batch.schema(), new_columns)?
-            } else {
-                batch
-            };
+            let mut sorted_stream =
+                self.merge_sorted_runs(self.metrics.baseline.intermediate())?;
+            assert_or_internal_err!(
+                self.sorted_runs.is_empty(),
+                "sorted_runs should be empty after constructing sorted stream"
+            );
 
-            organized_batches.push(organized_batch);
-        }
+            let mut in_progress =
+                self.spill_manager.create_in_progress_file("Sorting")?;
+            let mut max_batch_memory = 0usize;
 
-        *globally_sorted_batches = organized_batches;
+            while let Some(batch) = sorted_stream.next().await {
+                let batch = batch?;
+                max_batch_memory = 
max_batch_memory.max(batch.get_sliced_size()?);
+                in_progress.append_batch(&batch)?;
+            }
 
-        Ok(())
-    }
+            drop(sorted_stream);
+            self.reservation.free();
 
-    /// Sorts the in-memory batches and merges them into a single sorted run, 
then writes
-    /// the result to spill files.
-    async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
-        assert_or_internal_err!(
-            !self.in_mem_batches.is_empty(),
-            "in_mem_batches must not be empty when attempting to sort and 
spill"
-        );
+            let spill_file = in_progress.finish()?;
+            if let Some(spill_file) = spill_file {
+                self.finished_spill_files.push(SortedSpillFile {
+                    file: spill_file,
+                    max_record_batch_memory: max_batch_memory,
+                });
+            }
+        } else {
+            // No merge headroom or single run: spill each run as its own
+            // file to avoid allocating merge cursor infrastructure.
+            debug!(
+                "Spilling {} sorted runs as individual files (no merge 
headroom)",
+                self.sorted_runs.len()
+            );
+            let all_runs = std::mem::take(&mut self.sorted_runs);
+            self.sorted_runs_memory = 0;
+            for run in all_runs {
+                let run_size: usize = 
run.iter().map(get_record_batch_memory_size).sum();
+
+                let mut in_progress =
+                    self.spill_manager.create_in_progress_file("Sorting")?;
+                let mut max_batch_memory = 0usize;
+                for batch in &run {
+                    in_progress.append_batch(batch)?;
+                    max_batch_memory = 
max_batch_memory.max(batch.get_sliced_size()?);
+                }
 
-        // Release the memory reserved for merge back to the pool so
-        // there is some left when `in_mem_sort_stream` requests an
-        // allocation. At the end of this function, memory will be
-        // reserved again for the next spill.
-        self.merge_reservation.free();
+                let spill_file = in_progress.finish()?;
+                if let Some(spill_file) = spill_file {
+                    self.finished_spill_files.push(SortedSpillFile {
+                        file: spill_file,
+                        max_record_batch_memory: max_batch_memory,
+                    });
+                }
 
-        let mut sorted_stream =
-            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
-        // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
-        // to construct a globally sorted stream.
-        assert_or_internal_err!(
-            self.in_mem_batches.is_empty(),
-            "in_mem_batches should be empty after constructing sorted stream"
-        );
-        // 'global' here refers to all buffered batches when the memory limit 
is
-        // reached. This variable will buffer the sorted batches after
-        // sort-preserving merge and incrementally append to spill files.
-        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
-
-        while let Some(batch) = sorted_stream.next().await {
-            let batch = batch?;
-            let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
-            if self.reservation.try_grow(sorted_size).is_err() {
-                // Although the reservation is not enough, the batch is
-                // already in memory, so it's okay to combine it with 
previously
-                // sorted batches, and spill together.
-                globally_sorted_batches.push(batch);
-                self.consume_and_spill_append(&mut globally_sorted_batches)
-                    .await?; // reservation is freed in spill()
-            } else {
-                globally_sorted_batches.push(batch);
+                drop(run);
+                self.reservation
+                    .shrink(run_size.min(self.reservation.size()));
             }
         }
 
-        // Drop early to free up memory reserved by the sorted stream, 
otherwise the
-        // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
-        drop(sorted_stream);
-
-        self.consume_and_spill_append(&mut globally_sorted_batches)
-            .await?;
-        self.spill_finish().await?;
-
-        // Sanity check after spilling
-        let buffers_cleared_property =
-            self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-        assert_or_internal_err!(
-            buffers_cleared_property,
-            "in_mem_batches and globally_sorted_batches should be cleared 
before"
-        );
-
-        // Reserve headroom for next sort/merge
         self.reserve_memory_for_merge()?;
 
         Ok(())
     }
 
-    /// Consumes in_mem_batches returning a sorted stream of
-    /// batches. This proceeds in one of two ways:
-    ///
-    /// # Small Datasets
-    ///
-    /// For "smaller" datasets, the data is first concatenated into a
-    /// single batch and then sorted. This is often faster than
-    /// sorting and then merging.
+    /// Merges the pre-sorted runs stored in `sorted_runs` into a single
+    /// sorted output stream. Each run is already sorted internally; this
+    /// method k-way merges them using the loser tree.
     ///
     /// ```text
-    ///        ┌─────┐
-    ///        │  2  │
-    ///        │  3  │
-    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
-    ///        │  4  │                     │  2  │
-    ///        │  2  │        │            │  3  │
-    ///        └─────┘                     │  1  │             sorted output
-    ///        ┌─────┐        ▼            │  4  │                stream
-    ///        │  1  │                     │  2  │
-    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
-    ///        │  1  │                     │  4  │
-    ///        └─────┘        ▲            │  1  │
-    ///          ...          │            │ ... │
-    ///                                    │  4  │
-    ///        ┌─────┐        │            │  3  │
-    ///        │  4  │                     └─────┘
-    ///        │  3  │─ ─ ─ ─ ┘
-    ///        └─────┘
-    ///     in_mem_batches
-    /// ```
-    ///
-    /// # Larger datasets
-    ///
-    /// For larger datasets, the batches are first sorted individually
-    /// and then merged together.
-    ///
-    /// ```text
-    ///      ┌─────┐                ┌─────┐
-    ///      │  2  │                │  1  │
-    ///      │  3  │                │  2  │
-    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
-    ///      │  4  │                │  3  │
-    ///      │  2  │                │  4  │          │
-    ///      └─────┘                └─────┘               sorted output
-    ///      ┌─────┐                ┌─────┐          ▼       stream
-    ///      │  1  │                │  1  │
-    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
-    ///      │  1  │                │  4  │
-    ///      └─────┘                └─────┘          ▲
-    ///        ...       ...         ...             │
-    ///
-    ///      ┌─────┐                ┌─────┐          │
-    ///      │  4  │                │  3  │
-    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
-    ///      └─────┘                └─────┘
-    ///
-    ///   in_mem_batches
+    ///   sorted_runs[0]                sorted_runs[1]
+    ///   ┌─────┐ ┌─────┐              ┌─────┐ ┌─────┐
+    ///   │ 1,2 │ │ 3,4 │              │ 1,3 │ │ 5,7 │
+    ///   └──┬──┘ └──┬──┘              └──┬──┘ └──┬──┘
+    ///      └───┬───┘                    └───┬───┘
+    ///          ▼                            ▼
+    ///     stream 0  ─ ─ ─ ─ ─ ─ ─▶  merge  ◀─ ─ ─  stream 1
+    ///                                  │
+    ///                                  ▼
+    ///                          sorted output stream
     /// ```
-    fn in_mem_sort_stream(
+    fn merge_sorted_runs(
         &mut self,
         metrics: BaselineMetrics,
     ) -> Result<SendableRecordBatchStream> {
-        if self.in_mem_batches.is_empty() {
+        let all_runs = std::mem::take(&mut self.sorted_runs);
+        self.sorted_runs_memory = 0;
+
+        if all_runs.is_empty() {
             return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
                 &self.schema,
             ))));
         }
 
-        // The elapsed compute timer is updated when the value is dropped.
-        // There is no need for an explicit call to drop.
         let elapsed_compute = metrics.elapsed_compute().clone();
         let _timer = elapsed_compute.timer();
 
-        // Please pay attention that any operation inside of 
`in_mem_sort_stream` will
-        // not perform any memory reservation. This is for avoiding the need 
of handling
-        // reservation failure and spilling in the middle of the sort/merge. 
The memory
-        // space for batches produced by the resulting stream will be reserved 
by the
-        // consumer of the stream.
-
-        if self.in_mem_batches.len() == 1 {
-            let batch = self.in_mem_batches.swap_remove(0);
+        // Single run: stream the chunks directly, no merge needed
+        if all_runs.len() == 1 {
+            let run = all_runs.into_iter().next().unwrap();
             let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, &metrics, reservation);
-        }
-
-        // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-        if self.reservation.size() < self.sort_in_place_threshold_bytes {
-            // Concatenate memory batches together and sort
-            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-            self.in_mem_batches.clear();
-            self.reservation
-                .try_resize(get_reserved_bytes_for_record_batch(&batch)?)
-                .map_err(Self::err_with_oom_context)?;
-            let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, &metrics, reservation);
-        }
-
-        let streams = std::mem::take(&mut self.in_mem_batches)
+            let schema = Arc::clone(&self.schema);
+            let output_rows = metrics.output_rows().clone();
+            let stream =
+                futures::stream::iter(run.into_iter().map(Ok)).map(move 
|batch| {
+                    match batch {
+                        Ok(batch) => {
+                            output_rows.add(batch.num_rows());
+                            Ok(batch)
+                        }
+                        Err(e) => Err(e),
+                    }
+                });
+            return Ok(Box::pin(ReservationStream::new(
+                Arc::clone(&schema),
+                Box::pin(RecordBatchStreamAdapter::new(schema, stream)),
+                reservation,
+            )));
+        }
+
+        // Multiple runs: create one stream per run and merge.
+        //
+        // Memory model for the multi-run merge:
+        // - self.reservation holds the sorted run data. It stays allocated
+        //   for the lifetime of the ExternalSorter (freed on drop). This
+        //   over-reserves as runs are consumed, but is conservative/safe.
+        // - The merge cursor (RowCursorStream/FieldCursorStream) allocates
+        //   from a new_empty() reservation, drawing from pool headroom
+        //   freed by merge_reservation.free() in the caller.
+        // - This works because sort() only enters this path when
+        //   merge_reservation > 0, guaranteeing pool headroom for cursors.
+        //   When merge_reservation == 0, sort() takes the spill path instead.
+        let streams = all_runs

Review Comment:
   if i'm not mistaken merge_sorted_runs() is also being called on the 
non-spill path inside `sort()` when `self.spilled_before()` evaluates to false, 
where merge_reservation is never pre-reserved. 
   
   maybe we can make the comment more descriptive on the different headroom 
reliance of the different paths?
   
   



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -323,56 +285,188 @@ impl ExternalSorter {
         self.reserve_memory_for_batch_and_maybe_spill(&input)
             .await?;
 
-        self.in_mem_batches.push(input);
+        let coalescer = self
+            .coalescer
+            .as_mut()
+            .expect("coalescer must exist during insert phase");
+        coalescer
+            .push_batch(input)
+            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
+
+        self.drain_completed_batches()?;
+
+        Ok(())
+    }
+
+    /// Drains completed (full) batches from the coalescer, sorts each,
+    /// and appends the sorted chunks to `sorted_runs`.
+    fn drain_completed_batches(&mut self) -> Result<()> {
+        // Collect completed batches first to avoid borrow conflict
+        let mut completed = vec![];
+        if let Some(coalescer) = self.coalescer.as_mut() {
+            while let Some(batch) = coalescer.next_completed_batch() {
+                completed.push(batch);
+            }
+        }
+        for batch in &completed {
+            self.sort_and_store_run(batch)?;
+        }
+        Ok(())
+    }
+
+    /// Sorts a single coalesced batch and stores the result as a new run.
+    /// Output is chunked back to `batch_size`.
+    fn sort_and_store_run(&mut self, batch: &RecordBatch) -> Result<()> {
+        let sorted_chunks = sort_batch_chunked(batch, &self.expr, 
self.batch_size)?;
+
+        // After take(), StringView arrays may reference shared buffers from
+        // multiple coalesced input batches, inflating reported memory size.
+        // GC compacts them so reservation tracking stays accurate.
+        let sorted_chunks = Self::gc_stringview_batches(sorted_chunks)?;
+
+        let run_size: usize =
+            sorted_chunks.iter().map(get_record_batch_memory_size).sum();
+
+        self.sorted_runs.push(sorted_chunks);
+        self.sorted_runs_memory += run_size;
+
+        // Align the pool reservation to match actual sorted run memory.
+        //
+        // Before sorting we reserve 2x the input batch size (space for
+        // both the unsorted input and the sorted output). After sorting
+        // we drop the input, so normally sorted_runs_memory < reservation
+        // and we shrink to free the excess back to the pool.
+        //
+        // The grow path handles a rare edge case: for very small batches
+        // (single-digit rows), Arrow's per-column buffer minimums (64
+        // bytes each) can make the sorted output slightly larger than
+        // the reservation. We use grow() rather than try_grow() because:
+        //
+        //   1. The memory is already allocated — the sorted run exists
+        //      in self.sorted_runs. This is accounting catch-up, not a
+        //      new allocation request.
+        //   2. Under-reporting is worse than over-reporting. If we
+        //      swallowed a try_grow() failure, the pool would think
+        //      there is free headroom that doesn't actually exist,
+        //      which could cause other operators to over-allocate and
+        //      trigger a real OOM.
+        //   3. The overshoot is small and bounded: it is at most the
+        //      per-column buffer overhead for a handful of rows, which
+        //      is tens of KB even with wide schemas.
+        let reservation_size = self.reservation.size();
+        if reservation_size > self.sorted_runs_memory {
+            self.reservation
+                .shrink(reservation_size - self.sorted_runs_memory);
+        } else if self.sorted_runs_memory > reservation_size {
+            self.reservation
+                .grow(self.sorted_runs_memory - reservation_size);
+        }
+
+        debug_assert_eq!(
+            self.reservation.size(),
+            self.sorted_runs_memory,
+            "reservation should track sorted_runs_memory after adjustment"
+        );
+
+        Ok(())
+    }
+
+    /// Compact StringView arrays in sorted batches to eliminate shared
+    /// buffer references from `take()`. Skips work if no StringView columns.
+    fn gc_stringview_batches(batches: Vec<RecordBatch>) -> 
Result<Vec<RecordBatch>> {
+        // Fast path: check schema for any StringView columns
+        if let Some(first) = batches.first() {
+            let has_stringview = first.schema().fields().iter().any(|f| {
+                matches!(f.data_type(), DataType::Utf8View | 
DataType::BinaryView)
+            });
+            if !has_stringview {
+                return Ok(batches);
+            }
+        }
+
+        let mut result = Vec::with_capacity(batches.len());
+        for batch in batches {
+            let mut new_columns: Vec<Arc<dyn Array>> =
+                Vec::with_capacity(batch.num_columns());
+            let mut mutated = false;
+            for array in batch.columns() {
+                if let Some(sv) = 
array.as_any().downcast_ref::<StringViewArray>() {
+                    new_columns.push(Arc::new(sv.gc()));
+                    mutated = true;
+                } else {
+                    new_columns.push(Arc::clone(array));
+                }
+            }
+            if mutated {
+                result.push(RecordBatch::try_new(batch.schema(), 
new_columns)?);
+            } else {
+                result.push(batch);
+            }
+        }
+        Ok(result)
+    }
+
+    /// Flushes any partially accumulated rows from the coalescer, sorts them,
+    /// and stores as a run. Called before spilling and at sort() time.
+    fn flush_coalescer(&mut self) -> Result<()> {
+        if let Some(coalescer) = self.coalescer.as_mut() {
+            coalescer
+                .finish_buffered_batch()
+                .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
+            self.drain_completed_batches()?;
+        }
         Ok(())
     }
 
     fn spilled_before(&self) -> bool {
         !self.finished_spill_files.is_empty()
     }
 
+    /// Returns true if there are sorted runs in memory.
+    fn has_sorted_runs(&self) -> bool {
+        !self.sorted_runs.is_empty()
+    }
+
     /// Returns the final sorted output of all batches inserted via
     /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
     ///
     /// This process could either be:
     ///
-    /// 1. An in-memory sort/merge (if the input fit in memory)
+    /// 1. An in-memory merge of sorted runs (if the input fit in memory)
     ///
-    /// 2. A combined streaming merge incorporating both in-memory
-    ///    batches and data from spill files on disk.
+    /// 2. A combined streaming merge incorporating sorted runs
+    ///    and data from spill files on disk.
     async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
+        self.flush_coalescer()?;
+        self.coalescer = None;
+
+        // If we spilled during the insert phase, some data is on disk
+        // and we must take the merge-from-disk path. Otherwise we can
+        // merge entirely in memory.
         if self.spilled_before() {
-            // Sort `in_mem_batches` and spill it first. If there are many
-            // `in_mem_batches` and the memory limit is almost reached, merging
-            // them with the spilled files at the same time might cause OOM.
-            if !self.in_mem_batches.is_empty() {
-                self.sort_and_spill_in_mem_batches().await?;
+            // Spill remaining sorted runs. Since runs are already sorted,
+            // each is written directly as its own spill file (no merge 
needed).
+            if self.has_sorted_runs() {
+                self.spill_sorted_runs().await?;
             }
 
-            // Transfer the pre-reserved merge memory to the streaming merge
-            // using `take()` instead of `new_empty()`. This ensures the merge
-            // stream starts with `sort_spill_reservation_bytes` already
-            // allocated, preventing starvation when concurrent sort partitions
-            // compete for pool memory. `take()` moves the bytes atomically
-            // without releasing them back to the pool, so other partitions
-            // cannot race to consume the freed memory.

Review Comment:
   This comment is quite useful, maybe we should keep it, could help future 
developers know why take is being used!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to