Dandandan commented on code in PR #21629:
URL: https://github.com/apache/datafusion/pull/21629#discussion_r3088501037
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -95,116 +93,76 @@ impl ExternalSorterMetrics {
///
/// # Algorithm
///
-/// 1. get a non-empty new batch from input
+/// Incoming batches are coalesced via [`BatchCoalescer`] to
+/// `sort_coalesce_target_rows` (default 32768) before sorting. This
+/// reduces merge fan-in by producing fewer, larger sorted runs.
///
-/// 2. check with the memory manager there is sufficient space to
-/// buffer the batch in memory.
+/// Each coalesced batch is sorted immediately and stored as a
+/// pre-sorted run.
///
-/// 2.1 if memory is sufficient, buffer batch in memory, go to 1.
+/// 1. For each incoming batch:
+/// - Reserve memory (2x batch size). If reservation fails, flush
+/// the coalescer, spill all sorted runs to disk, then retry.
+/// - Push batch into the coalescer.
+/// - If the coalescer reached its target: sort the coalesced batch
+/// and store as a new sorted run.
///
-/// 2.2 if no more memory is available, sort all buffered batches and
-/// spill to file. buffer the next batch in memory, go to 1.
-///
-/// 3. when input is exhausted, merge all in memory batches and spills
-/// to get a total order.
+/// 2. When input is exhausted, merge all sorted runs (and any spill
+/// files) to produce a total order.
///
/// # When data fits in available memory
///
-/// If there is sufficient memory, data is sorted in memory to produce the
output
+/// Sorted runs are merged in memory using a loser-tree k-way merge
+/// (via [`StreamingMergeBuilder`]).
///
/// ```text
-/// ┌─────┐
-/// │ 2 │
-/// │ 3 │
-/// │ 1 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
-/// │ 4 │
-/// │ 2 │ │
-/// └─────┘ ▼
-/// ┌─────┐
-/// │ 1 │ In memory
-/// │ 4 │─ ─ ─ ─ ─ ─▶ sort/merge ─ ─ ─ ─ ─▶ total sorted output
-/// │ 1 │
-/// └─────┘ ▲
-/// ... │
-///
-/// ┌─────┐ │
-/// │ 4 │
-/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
-/// └─────┘
-///
-/// in_mem_batches
+/// ┌──────────┐ ┌────────────┐ ┌──────┐ ┌────────────┐
+/// │ Incoming │────▶│ Batch │────▶│ Sort │────▶│ Sorted Run │
+/// │ Batches │ │ Coalescer │ │ │ │ (in memory)│
+/// └──────────┘ └────────────┘ └──────┘ └─────┬──────┘
+/// │
+/// ┌──────────────┘
+/// ▼
+/// k-way merge (loser tree)
+/// │
+/// ▼
+/// total sorted output
/// ```
///
/// # When data does not fit in available memory
///
-/// When memory is exhausted, data is first sorted and written to one
-/// or more spill files on disk:
-///
-/// ```text
-/// ┌─────┐ .─────────────────.
-/// │ 2 │ ( )
-/// │ 3 │ │`─────────────────'│
-/// │ 1 │─ ─ ─ ─ ─ ─ ─ │ ┌────┐ │
-/// │ 4 │ │ │ │ 1 │░ │
-/// │ 2 │ │ │... │░ │
-/// └─────┘ ▼ │ │ 4 │░ ┌ ─ ─ │
-/// ┌─────┐ │ └────┘░ 1 │░ │
-/// │ 1 │ In memory │ ░░░░░░ │ ░░ │
-/// │ 4 │─ ─ ▶ sort/merge ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
-/// │ 1 │ and write to file │ │ ░░ │
-/// └─────┘ │ 4 │░ │
-/// ... ▲ │ └░─░─░░ │
-/// │ │ ░░░░░░ │
-/// ┌─────┐ │.─────────────────.│
-/// │ 4 │ │ ( )
-/// │ 3 │─ ─ ─ ─ ─ ─ ─ `─────────────────'
-/// └─────┘
-///
-/// in_mem_batches spills
-/// (file on disk in Arrow
-/// IPC format)
-/// ```
-///
-/// Once the input is completely read, the spill files are read and
-/// merged with any in memory batches to produce a single total sorted
-/// output:
+/// When memory is exhausted, sorted runs are spilled directly to disk
+/// (one spill file per run — no merge needed since runs are already
+/// sorted). `MultiLevelMergeBuilder` handles the final merge from disk
+/// with dynamic fan-in.
///
/// ```text
-/// .─────────────────.
-/// ( )
-/// │`─────────────────'│
-/// │ ┌────┐ │
-/// │ │ 1 │░ │
-/// │ │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
-/// │ │ 4 │░ ┌────┐ │ │
-/// │ └────┘░ │ 1 │░ │ ▼
-/// │ ░░░░░░ │ │░ │
-/// │ │... │─ ─│─ ─ ─ ▶ merge ─ ─ ─▶ total sorted output
-/// │ │ │░ │
-/// │ │ 4 │░ │ ▲
-/// │ └────┘░ │ │
-/// │ ░░░░░░ │
-/// │.─────────────────.│ │
-/// ( )
-/// `─────────────────' │
-/// spills
+/// ┌──────────┐ ┌────────────┐ ┌──────┐ ┌────────────┐
+/// │ Incoming │────▶│ Batch │────▶│ Sort │────▶│ Sorted Run │
Review Comment:
In the current approach, it would be worth it to "push down" the target
batch size to the inner coalescer (if any), so e.g. the output of the previous
stage already is of the higher batch size instead of doing another copy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]