mbutrovich commented on issue #21543:
URL: https://github.com/apache/datafusion/issues/21543#issuecomment-4297334535

   > The existing tuple/string/dictionary cases use `make_sort_exprs(schema)`, 
which sorts by every column. Would a small benchmark-only PR adding a case that 
includes a table with a cheap sort-able key such as an `i64` key and non-key 
utf8/dictionary payload columns but ONLY gets sorted by the `i64` key be useful?
   
   Hi @gratus00! We could definitely use more sort benchmarks, and in fact some 
of what you're describing I have in #21688 if you want to look at those for 
inspiration and bring them to a separate PR.
   
   I had Claude summarize my notes from over the weekend of looking at this. 
I'm kinda stumped. I can make TPC-H faster, but at the expense of other schemas 
I care about (e.g., unconditional coalescing hurts wide/large schemas).
   
   > # ExternalSorter Investigation Summary
   > 
   > ## Context
   > 
   > PR #21688 (`externalsorter4`) rewrites ExternalSorter to coalesce batches 
before sorting, reducing merge fan-in. At TPC-H SF10 (~60M rows), this gives 11 
queries faster (up to 1.51x), 0 regressions. Issue #21543 tracks the broader 
redesign.
   > 
   > Four iterations were explored (es1–es4), each trying to address 
regressions introduced by the previous. PR #21629 (`externalsorter`, es1) is 
the predecessor.
   > 
   > ## Key Findings
   > 
   > ### 1. BATCH_SIZE matters enormously
   > 
   > All prior microbenchmark comparisons used BATCH_SIZE=1024. At the 
realistic default of 8192, the picture flips:
   > 
   > | Benchmark (1M rows) | main | es5 (=es4+8192) | ratio |
   > |:---|---:|---:|:---|
   > | i64 | 34.9 | 33.4 | 0.96x (same) |
   > | f64 | 36.1 | 35.6 | 0.98x (same) |
   > | utf8 tuple (3 col) | 95.0 | 147.7 | **1.55x slower** |
   > | utf8 view tuple (3 col) | 81.4 | 125.1 | **1.54x slower** |
   > | mixed tuple | 83.9 | 127.2 | **1.52x slower** |
   > | mixed tuple w/ view | 75.9 | 111.3 | **1.47x slower** |
   > | utf8 dict tuple | 49.8 | 93.8 | **1.88x slower** |
   > | mixed dict tuple | 76.6 | 275.1 | **3.59x slower** |
   > | utf8 dict | 37.9 | 32.5 | 0.86x (faster) |
   > | utf8 view low card | 25.8 | 26.3 | ~same |
   > | utf8 high card | 65.7 | 61.3 | 0.93x (same) |
   > 
   > At 1024, es4 showed wins on single-column sorts (1.3-1.6x faster). At 
8192, those wins disappear because main's per-batch sort already has a 
meaningful working set — the cache advantage of coalescing shrinks when the 
expansion is only 4x (8192→32768) instead of 32x (1024→32768).
   > 
   > Multi-column tuple regressions got worse at 8192, not better.
   > 
   > ### 2. The microbenchmark is in the wrong regime
   > 
   > At 1M rows / 8192 batch size: ~122 batches → ~122 runs on main, ~30 on 
es4. Fan-in reduction is modest (4x) and the merge at fan-in 122 is already 
manageable.
   > 
   > At TPC-H SF10 (~60M rows): ~7300 batches → ~7300 runs on main, ~1800 on 
es4. The fan-in reduction matters here — multi-level merge kicks in, cursor 
management for 7300 streams is expensive, etc.
   > 
   > The crossover point where coalescing pays off is somewhere between 1M and 
60M rows. The microbenchmark is below it; TPC-H is above it.
   > 
   > ### 3. Profiling reveals double RowConverter encoding
   > 
   > Profiled `sort utf8 tuple 1M` on both branches (samply + Firefox Profiler, 
`--profile profiling`):
   > 
   > **main (inverted call tree, self time):**
   > | Self % | Function |
   > |---:|:---|
   > | 3.6% | `_platform_memmove` |
   > | 3.5% | `arrow_row::variable::encode_one` |
   > | ~1.8% | quicksort partition |
   > | ~1.4% | `LengthTracker::push_variable` |
   > 
   > **es5 (inverted call tree, self time):**
   > | Self % | Function |
   > |---:|:---|
   > | 7.8% | `_platform_memmove` |
   > | 7.6% | `arrow_row::variable::encode_one` |
   > | 7.6% | `arrow_row::RowConverter::append` |
   > | ~3.7% | `arrow_select::take::take_impl` |
   > | ~3.5% | sort internals |
   > 
   > es5 spends **2x** the time in memmove and **2x** in RowConverter encoding 
vs main. This is because es5 encodes to Rows twice:
   > 1. **Sort phase**: concat key columns (memmove) → encode to Rows → sort 
Rows → take
   > 2. **Merge phase**: RowCursorStream re-encodes sorted output batches to 
Rows for merge comparisons
   > 
   > The Rows from the sort phase are discarded. The merge builds new ones from 
scratch. ~15% of total time is wasted on this double work.
   > 
   > ### 4. StreamingMerge only uses RowConverter for multi-column sorts
   > 
   > `StreamingMergeBuilder::build()` dispatches:
   > - **Single-column primitive** → `FieldCursorStream<PrimitiveArray>` — 
native comparisons, no RowConverter
   > - **Single-column Utf8/Utf8View/Binary** → `FieldCursorStream` — byte 
comparisons, no RowConverter  
   > - **Multi-column** → `RowCursorStream` — RowConverter encoding per batch
   > 
   > The double-encoding problem only affects multi-column sorts — which are 
exactly the benchmarks with the worst regressions.
   > 
   > ### 5. Key-only extraction doesn't help at realistic batch sizes
   > 
   > New key/value benchmarks (sort on key columns only, carry non-key value 
columns):
   > 
   > | Benchmark (1M) | main | es5 | ratio |
   > |:---|---:|---:|:---|
   > | i64 key, 10x utf8 view value | 46.1 | 53.7 | 1.16x slower |
   > | utf8 view low card key, large value | 28.0 | 32.1 | 1.15x slower |
   > | utf8 view high card key, large value | 51.0 | 66.5 | 1.30x slower |
   > | (i64, utf8 view) key, 5x f64 value | 66.5 | 98.6 | 1.48x slower |
   > 
   > Key-only extraction (es4's core optimization — don't touch value columns 
during sort) provides no benefit at 8192 batch size. The coalescing overhead 
exceeds the savings from avoiding value column work.
   > 
   > ### 6. Dictionary regressions don't matter for real workloads
   > 
   > - DataFusion native Parquet readers produce StringView, not DictionaryArray
   > - Comet unpacks dictionaries to StringArray before ExternalSorter
   > - No major consumer sends DictionaryArray into the sort pipeline
   > - The 3.59x `mixed dict tuple` regression is a benchmark artifact
   > 
   > ### 7. es3's StringView conversion wasn't a universal win
   > 
   > es3 converted StringArray→StringView internally. Results were mixed:
   > - Low-cardinality short strings: 1.70x faster (inlined in 16-byte views)
   > - High-cardinality long strings (>12 bytes): 1.12x slower (views still 
chase pointers)
   > - Dictionary columns: untouched (es3 never tried Dict→StringView)
   > 
   > ## Proposed Architecture
   > 
   > Two orthogonal improvements that reduce the coalescing overhead:
   > 
   > ### A. Incremental RowConverter::append (eliminate concat)
   > 
   > Current es4 (for varlen multi-column keys):
   > ```
   > batch₁ keys ─┐
   > batch₂ keys ─┼── concat (memmove #1) ──► big key array ──► 
RowConverter::append (memmove #2) ──► Rows
   > batch₃ keys ─┘
   > ```
   > 
   > Proposed:
   > ```
   > batch₁ keys ──► RowConverter::append ──┐
   > batch₂ keys ──► RowConverter::append ──┼──► Rows  (one memmove)
   > batch₃ keys ──► RowConverter::append ──┘
   > ```
   > 
   > `RowConverter::append` already supports incremental extension. This 
eliminates the key column concat entirely.
   > 
   > ### B. Pass Rows from sort phase to merge phase (eliminate double encoding)
   > 
   > Current: sort encodes to Rows → sort → discard Rows → merge re-encodes to 
Rows
   > 
   > Proposed: sort encodes to Rows → sort → pass Rows to merge → merge uses 
them directly
   > 
   > This requires plumbing changes in StreamingMergeBuilder to accept 
pre-encoded Rows via a new `PartitionedStream` implementation. Only applies 
when the merge uses `RowCursorStream` (multi-column sorts).
   > 
   > ### Combined effect
   > 
   > Eliminates 3 copies/encodings → 1:
   > 1. ~~concat key columns~~ (removed by A)
   > 2. ~~encode for sort~~ (kept — this is the one encoding)  
   > 3. ~~re-encode for merge~~ (removed by B)
   > 
   > From the profile, this could save ~15% of total time (7.8% memmove + 7.6% 
encoding), which would shift the crossover point lower and potentially make 
coalescing viable even at 1M rows.
   > 
   > ### Scope and limitations
   > 
   > - Only applies to multi-column sort keys (single-column sorts don't use 
RowConverter in merge)
   > - Only applies to the RowConverter path (varlen keys) — lexsort path for 
fixed-width keys still needs concat
   > - Reconstruction of output columns (take/interleave) is unchanged
   > - Requires changes to `PartitionedStream` trait and 
`StreamingMergeBuilder` API
   > 
   > ## Encode-Once Implementation and Profiling (branch: sort_rows)
   > 
   > ### What we built
   > 
   > Implemented the encode-once architecture on branch `sort_rows`:
   > - `uses_row_cursor()`: shared dispatch matching 
`StreamingMergeBuilder::build()` — single-column primitive/string sorts stay on 
main's path, multi-column sorts use the new row-encoded path
   > - `AccumulationBuffer`: incremental `RowConverter::append` as batches 
arrive
   > - `SortedRun`: deferred materialization — stores original batches + sorted 
permutation + pre-encoded Rows
   > - `SortedRunStream`: `PartitionedStream` that lazily materializes batches 
and provides pre-computed `RowValues` to the merge (no re-encoding)
   > - Schema-dependent reconstruction: concat+take for StringArray, interleave 
for StringView/fixed-width
   > - `sort_coalesce_target_rows` config (default 32768)
   > 
   > ### Profiling results (sort utf8 tuple 1M, BATCH_SIZE=8192)
   > 
   > Profiled with samply on `sort utf8 tuple 1M` (3-column StringArray, the 
worst-case schema for data movement). Compared four variants:
   > 
   > **main** (per-batch sort, no coalescing):
   > | Self % | Function |
   > |---:|:---|
   > | 3.6% | `_platform_memmove` |
   > | 3.5% | `arrow_row::variable::encode_one` (merge phase only) |
   > | ~1.8% | quicksort partition |
   > 
   > **es5** (es4 approach: concat keys + RowConverter sort + take/interleave):
   > | Self % | Function |
   > |---:|:---|
   > | 7.8% | `_platform_memmove` (concat + take) |
   > | 7.6% | `arrow_row::variable::encode_one` (sort phase) |
   > | 7.6% | `RowConverter::append` (sort phase) |
   > | 3.7% | `take_impl` |
   > 
   > **sort_rows v1** (encode-once, interleave from originals):
   > | Self % | Function |
   > |---:|:---|
   > | 19% | `_platform_memmove` |
   > | 12% | `interleave_bytes` (flush + merge — double interleave) |
   > | 4.4% | `encode_one` (accumulation phase only) |
   > | 14% | `RowConverter::append` (accumulation — the one encode) |
   > 
   > **sort_rows v2** (encode-once, concat+take for StringArray):
   > | Self % | Function |
   > |---:|:---|
   > | 21% | `_platform_memmove` |
   > | 4% | `take_bytes` (from concat'd batch) |
   > | 6.1% | `interleave_bytes` (merge output only — unavoidable) |
   > | 6% | `encode_one` |
   > | 0.4% | `concat_batches` |
   > 
   > ### What we learned
   > 
   > **The encode-once goal was achieved.** RowConverter encoding dropped from 
double (es5: sort + merge) to single (sort_rows: accumulation only). The merge 
uses pre-computed Rows via `SortedRunStream`, completely bypassing 
`RowCursorStream`'s per-batch encoding.
   > 
   > **But the reconstruction cost dominates.** At 1M rows, moving string bytes 
from unsorted batches to sorted output is the bottleneck, not encoding. We 
tried three reconstruction strategies:
   > 1. **Interleave from originals** (v1): 12% `interleave_bytes` — 
scatter-gather across 4 source batch string buffers
   > 2. **Concat + take** (v2): 0.4% concat + 4% take_bytes — sequential concat 
into one buffer, then indexed access
   > 3. **StringView intermediate** (discussed, not implemented): would have 
same random-access pattern as (1) when converting back to StringArray
   > 
   > All strategies move the same total bytes. The profile shapes are nearly 
identical — the work just shifts between functions. The fundamental cost is O(N 
× avg_string_length) string copying in permutation order, regardless of 
approach.
   > 
   > **The double interleave problem.** sort_rows v1 had two interleave passes:
   > 1. `SortedRun::next_chunk` → interleave from originals (materializing the 
run)
   > 2. `BatchBuilder::build_record_batch` → interleave from runs (merge output)
   > 
   > Deferred materialization (not eagerly building RecordBatches in flush) 
didn't help — the lazy materialization in `next_chunk` does the same work, just 
later. The merge's `BatchBuilder` interleave exists on main too and is 
unavoidable.
   > 
   > **The crossover point is scale-dependent.** At 1M rows / 8192 batch size, 
main produces 122 runs. Our approach produces ~30 runs. The merge fan-in 
reduction (122→30) saves merge comparison work, but the coalescing overhead 
(concat or interleave + Rows encoding + Rows gathering) exceeds the savings. At 
TPC-H SF10 (~60M rows, 7300→1800 runs), the merge savings dominate and the 
approach wins (+15% total query time).
   > 
   > ### Fundamental insight
   > 
   > For StringArray-heavy schemas at moderate scale, the sort pipeline's cost 
is dominated by **data movement** (physically copying string bytes), not 
**comparison** (sort/merge key encoding). The encode-once architecture 
successfully eliminates redundant encoding work, but encoding was only ~15% of 
the total. The remaining 85% is moving bytes around, and no intermediate 
representation change (StringView, Rows, concat) reduces the total bytes that 
must move.
   > 
   > Main's approach — sort each batch independently within its own buffer, 
then merge — minimizes cross-batch data movement. Each batch's sort is entirely 
within one contiguous buffer (cache-friendly). The only cross-batch operation 
is the merge output, which is unavoidable.
   > 
   > Coalescing adds cross-batch data movement (concat, interleave, or 
take-from-concat) in exchange for fewer merge runs. The tradeoff only pays off 
when the merge cost (proportional to fan-in) dominates the data movement cost 
(proportional to data size). This happens at large scale (TPC-H SF10) but not 
at moderate scale (1M rows).
   > 
   > ## Conclusions and Recommendations
   > 
   > ### The core finding
   > 
   > ExternalSorter's per-batch sort + merge architecture on main is already 
well-suited for moderate-scale workloads (up to ~10M rows) at realistic batch 
sizes (8192). The bottleneck at this scale is **data movement** (physically 
copying string bytes), not comparison or encoding overhead. No intermediate 
representation change (Rows, StringView, concat) reduces the total bytes that 
must move.
   > 
   > At large scale (TPC-H SF10, ~60M rows), **merge fan-in** becomes the 
bottleneck. Reducing fan-in from 7300 to 1800 runs gives a 15% speedup. But the 
cheapest way to reduce fan-in is to increase input batch size, not to coalesce 
inside the sort.
   > 
   > ### For DataFusion
   > 
   > **No ExternalSorter changes recommended at this time.** The current 
implementation is efficient. The encode-once architecture (branch `sort_rows`) 
successfully eliminates double RowConverter encoding in the merge, but the 
savings (~15% of pipeline time) are offset by the additional data movement cost 
of coalescing at moderate scale.
   > 
   > When `lexsort_radix` lands in arrow-rs (#9683), it provides 1.5-2.5x 
faster sorting on Rows at 32K+ rows. The encode-once architecture on 
`sort_rows` provides the infrastructure to adopt it (incremental 
`RowConverter::append`, `SortedRun` with permutation, `SortedRunStream` for 
merge handoff). The radix sort may shift the break-even point for coalescing to 
a lower scale, making the approach viable for moderate workloads. This should 
be revisited when the kernel is available.
   > 
   > ### For Comet
   > 
   > **Increase batch size rather than redesign the sort.** Comet sees 
performance wins at 32768 batch size (up from 8192 default). At 60M rows, this 
reduces merge fan-in from 7300 to 1800 — the same improvement that es4's 
coalescing achieved, but without any sort-internal overhead.
   > 
   > The simplest approach: have Comet's shuffle writer produce larger batches 
when feeding a CometSort. The planner knows the plan topology and can set batch 
size based on the downstream operator. Sort wants large batches (fewer runs); 
other operators may prefer smaller batches for pipelining.
   > 
   > Alternatively: increase Comet's default batch size globally to 32768 and 
validate that nothing regresses. The sort and merge both handle 32K batches 
efficiently.
   > 
   > A Comet-specific sort operator may still be worthwhile for other reasons 
(working directly with Spark's row format, spilling in shuffle format, 
JNI-aware memory), but the fan-in problem is solvable without one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to