mbutrovich commented on issue #21543: URL: https://github.com/apache/datafusion/issues/21543#issuecomment-4297334535
> The existing tuple/string/dictionary cases use `make_sort_exprs(schema)`, which sorts by every column. Would a small benchmark-only PR adding a case that includes a table with a cheap sort-able key such as an `i64` key and non-key utf8/dictionary payload columns but ONLY gets sorted by the `i64` key be useful? Hi @gratus00! We could definitely use more sort benchmarks, and in fact some of what you're describing I have in #21688 if you want to look at those for inspiration and bring them to a separate PR. I had Claude summarize my notes from over the weekend of looking at this. I'm kinda stumped. I can make TPC-H faster, but at the expense of other schemas I care about (e.g., unconditional coalescing hurts wide/large schemas). > # ExternalSorter Investigation Summary > > ## Context > > PR #21688 (`externalsorter4`) rewrites ExternalSorter to coalesce batches before sorting, reducing merge fan-in. At TPC-H SF10 (~60M rows), this gives 11 queries faster (up to 1.51x), 0 regressions. Issue #21543 tracks the broader redesign. > > Four iterations were explored (es1–es4), each trying to address regressions introduced by the previous. PR #21629 (`externalsorter`, es1) is the predecessor. > > ## Key Findings > > ### 1. BATCH_SIZE matters enormously > > All prior microbenchmark comparisons used BATCH_SIZE=1024. At the realistic default of 8192, the picture flips: > > | Benchmark (1M rows) | main | es5 (=es4+8192) | ratio | > |:---|---:|---:|:---| > | i64 | 34.9 | 33.4 | 0.96x (same) | > | f64 | 36.1 | 35.6 | 0.98x (same) | > | utf8 tuple (3 col) | 95.0 | 147.7 | **1.55x slower** | > | utf8 view tuple (3 col) | 81.4 | 125.1 | **1.54x slower** | > | mixed tuple | 83.9 | 127.2 | **1.52x slower** | > | mixed tuple w/ view | 75.9 | 111.3 | **1.47x slower** | > | utf8 dict tuple | 49.8 | 93.8 | **1.88x slower** | > | mixed dict tuple | 76.6 | 275.1 | **3.59x slower** | > | utf8 dict | 37.9 | 32.5 | 0.86x (faster) | > | utf8 view low card | 25.8 | 26.3 | ~same | > | utf8 high card | 65.7 | 61.3 | 0.93x (same) | > > At 1024, es4 showed wins on single-column sorts (1.3-1.6x faster). At 8192, those wins disappear because main's per-batch sort already has a meaningful working set — the cache advantage of coalescing shrinks when the expansion is only 4x (8192→32768) instead of 32x (1024→32768). > > Multi-column tuple regressions got worse at 8192, not better. > > ### 2. The microbenchmark is in the wrong regime > > At 1M rows / 8192 batch size: ~122 batches → ~122 runs on main, ~30 on es4. Fan-in reduction is modest (4x) and the merge at fan-in 122 is already manageable. > > At TPC-H SF10 (~60M rows): ~7300 batches → ~7300 runs on main, ~1800 on es4. The fan-in reduction matters here — multi-level merge kicks in, cursor management for 7300 streams is expensive, etc. > > The crossover point where coalescing pays off is somewhere between 1M and 60M rows. The microbenchmark is below it; TPC-H is above it. > > ### 3. Profiling reveals double RowConverter encoding > > Profiled `sort utf8 tuple 1M` on both branches (samply + Firefox Profiler, `--profile profiling`): > > **main (inverted call tree, self time):** > | Self % | Function | > |---:|:---| > | 3.6% | `_platform_memmove` | > | 3.5% | `arrow_row::variable::encode_one` | > | ~1.8% | quicksort partition | > | ~1.4% | `LengthTracker::push_variable` | > > **es5 (inverted call tree, self time):** > | Self % | Function | > |---:|:---| > | 7.8% | `_platform_memmove` | > | 7.6% | `arrow_row::variable::encode_one` | > | 7.6% | `arrow_row::RowConverter::append` | > | ~3.7% | `arrow_select::take::take_impl` | > | ~3.5% | sort internals | > > es5 spends **2x** the time in memmove and **2x** in RowConverter encoding vs main. This is because es5 encodes to Rows twice: > 1. **Sort phase**: concat key columns (memmove) → encode to Rows → sort Rows → take > 2. **Merge phase**: RowCursorStream re-encodes sorted output batches to Rows for merge comparisons > > The Rows from the sort phase are discarded. The merge builds new ones from scratch. ~15% of total time is wasted on this double work. > > ### 4. StreamingMerge only uses RowConverter for multi-column sorts > > `StreamingMergeBuilder::build()` dispatches: > - **Single-column primitive** → `FieldCursorStream<PrimitiveArray>` — native comparisons, no RowConverter > - **Single-column Utf8/Utf8View/Binary** → `FieldCursorStream` — byte comparisons, no RowConverter > - **Multi-column** → `RowCursorStream` — RowConverter encoding per batch > > The double-encoding problem only affects multi-column sorts — which are exactly the benchmarks with the worst regressions. > > ### 5. Key-only extraction doesn't help at realistic batch sizes > > New key/value benchmarks (sort on key columns only, carry non-key value columns): > > | Benchmark (1M) | main | es5 | ratio | > |:---|---:|---:|:---| > | i64 key, 10x utf8 view value | 46.1 | 53.7 | 1.16x slower | > | utf8 view low card key, large value | 28.0 | 32.1 | 1.15x slower | > | utf8 view high card key, large value | 51.0 | 66.5 | 1.30x slower | > | (i64, utf8 view) key, 5x f64 value | 66.5 | 98.6 | 1.48x slower | > > Key-only extraction (es4's core optimization — don't touch value columns during sort) provides no benefit at 8192 batch size. The coalescing overhead exceeds the savings from avoiding value column work. > > ### 6. Dictionary regressions don't matter for real workloads > > - DataFusion native Parquet readers produce StringView, not DictionaryArray > - Comet unpacks dictionaries to StringArray before ExternalSorter > - No major consumer sends DictionaryArray into the sort pipeline > - The 3.59x `mixed dict tuple` regression is a benchmark artifact > > ### 7. es3's StringView conversion wasn't a universal win > > es3 converted StringArray→StringView internally. Results were mixed: > - Low-cardinality short strings: 1.70x faster (inlined in 16-byte views) > - High-cardinality long strings (>12 bytes): 1.12x slower (views still chase pointers) > - Dictionary columns: untouched (es3 never tried Dict→StringView) > > ## Proposed Architecture > > Two orthogonal improvements that reduce the coalescing overhead: > > ### A. Incremental RowConverter::append (eliminate concat) > > Current es4 (for varlen multi-column keys): > ``` > batch₁ keys ─┐ > batch₂ keys ─┼── concat (memmove #1) ──► big key array ──► RowConverter::append (memmove #2) ──► Rows > batch₃ keys ─┘ > ``` > > Proposed: > ``` > batch₁ keys ──► RowConverter::append ──┐ > batch₂ keys ──► RowConverter::append ──┼──► Rows (one memmove) > batch₃ keys ──► RowConverter::append ──┘ > ``` > > `RowConverter::append` already supports incremental extension. This eliminates the key column concat entirely. > > ### B. Pass Rows from sort phase to merge phase (eliminate double encoding) > > Current: sort encodes to Rows → sort → discard Rows → merge re-encodes to Rows > > Proposed: sort encodes to Rows → sort → pass Rows to merge → merge uses them directly > > This requires plumbing changes in StreamingMergeBuilder to accept pre-encoded Rows via a new `PartitionedStream` implementation. Only applies when the merge uses `RowCursorStream` (multi-column sorts). > > ### Combined effect > > Eliminates 3 copies/encodings → 1: > 1. ~~concat key columns~~ (removed by A) > 2. ~~encode for sort~~ (kept — this is the one encoding) > 3. ~~re-encode for merge~~ (removed by B) > > From the profile, this could save ~15% of total time (7.8% memmove + 7.6% encoding), which would shift the crossover point lower and potentially make coalescing viable even at 1M rows. > > ### Scope and limitations > > - Only applies to multi-column sort keys (single-column sorts don't use RowConverter in merge) > - Only applies to the RowConverter path (varlen keys) — lexsort path for fixed-width keys still needs concat > - Reconstruction of output columns (take/interleave) is unchanged > - Requires changes to `PartitionedStream` trait and `StreamingMergeBuilder` API > > ## Encode-Once Implementation and Profiling (branch: sort_rows) > > ### What we built > > Implemented the encode-once architecture on branch `sort_rows`: > - `uses_row_cursor()`: shared dispatch matching `StreamingMergeBuilder::build()` — single-column primitive/string sorts stay on main's path, multi-column sorts use the new row-encoded path > - `AccumulationBuffer`: incremental `RowConverter::append` as batches arrive > - `SortedRun`: deferred materialization — stores original batches + sorted permutation + pre-encoded Rows > - `SortedRunStream`: `PartitionedStream` that lazily materializes batches and provides pre-computed `RowValues` to the merge (no re-encoding) > - Schema-dependent reconstruction: concat+take for StringArray, interleave for StringView/fixed-width > - `sort_coalesce_target_rows` config (default 32768) > > ### Profiling results (sort utf8 tuple 1M, BATCH_SIZE=8192) > > Profiled with samply on `sort utf8 tuple 1M` (3-column StringArray, the worst-case schema for data movement). Compared four variants: > > **main** (per-batch sort, no coalescing): > | Self % | Function | > |---:|:---| > | 3.6% | `_platform_memmove` | > | 3.5% | `arrow_row::variable::encode_one` (merge phase only) | > | ~1.8% | quicksort partition | > > **es5** (es4 approach: concat keys + RowConverter sort + take/interleave): > | Self % | Function | > |---:|:---| > | 7.8% | `_platform_memmove` (concat + take) | > | 7.6% | `arrow_row::variable::encode_one` (sort phase) | > | 7.6% | `RowConverter::append` (sort phase) | > | 3.7% | `take_impl` | > > **sort_rows v1** (encode-once, interleave from originals): > | Self % | Function | > |---:|:---| > | 19% | `_platform_memmove` | > | 12% | `interleave_bytes` (flush + merge — double interleave) | > | 4.4% | `encode_one` (accumulation phase only) | > | 14% | `RowConverter::append` (accumulation — the one encode) | > > **sort_rows v2** (encode-once, concat+take for StringArray): > | Self % | Function | > |---:|:---| > | 21% | `_platform_memmove` | > | 4% | `take_bytes` (from concat'd batch) | > | 6.1% | `interleave_bytes` (merge output only — unavoidable) | > | 6% | `encode_one` | > | 0.4% | `concat_batches` | > > ### What we learned > > **The encode-once goal was achieved.** RowConverter encoding dropped from double (es5: sort + merge) to single (sort_rows: accumulation only). The merge uses pre-computed Rows via `SortedRunStream`, completely bypassing `RowCursorStream`'s per-batch encoding. > > **But the reconstruction cost dominates.** At 1M rows, moving string bytes from unsorted batches to sorted output is the bottleneck, not encoding. We tried three reconstruction strategies: > 1. **Interleave from originals** (v1): 12% `interleave_bytes` — scatter-gather across 4 source batch string buffers > 2. **Concat + take** (v2): 0.4% concat + 4% take_bytes — sequential concat into one buffer, then indexed access > 3. **StringView intermediate** (discussed, not implemented): would have same random-access pattern as (1) when converting back to StringArray > > All strategies move the same total bytes. The profile shapes are nearly identical — the work just shifts between functions. The fundamental cost is O(N × avg_string_length) string copying in permutation order, regardless of approach. > > **The double interleave problem.** sort_rows v1 had two interleave passes: > 1. `SortedRun::next_chunk` → interleave from originals (materializing the run) > 2. `BatchBuilder::build_record_batch` → interleave from runs (merge output) > > Deferred materialization (not eagerly building RecordBatches in flush) didn't help — the lazy materialization in `next_chunk` does the same work, just later. The merge's `BatchBuilder` interleave exists on main too and is unavoidable. > > **The crossover point is scale-dependent.** At 1M rows / 8192 batch size, main produces 122 runs. Our approach produces ~30 runs. The merge fan-in reduction (122→30) saves merge comparison work, but the coalescing overhead (concat or interleave + Rows encoding + Rows gathering) exceeds the savings. At TPC-H SF10 (~60M rows, 7300→1800 runs), the merge savings dominate and the approach wins (+15% total query time). > > ### Fundamental insight > > For StringArray-heavy schemas at moderate scale, the sort pipeline's cost is dominated by **data movement** (physically copying string bytes), not **comparison** (sort/merge key encoding). The encode-once architecture successfully eliminates redundant encoding work, but encoding was only ~15% of the total. The remaining 85% is moving bytes around, and no intermediate representation change (StringView, Rows, concat) reduces the total bytes that must move. > > Main's approach — sort each batch independently within its own buffer, then merge — minimizes cross-batch data movement. Each batch's sort is entirely within one contiguous buffer (cache-friendly). The only cross-batch operation is the merge output, which is unavoidable. > > Coalescing adds cross-batch data movement (concat, interleave, or take-from-concat) in exchange for fewer merge runs. The tradeoff only pays off when the merge cost (proportional to fan-in) dominates the data movement cost (proportional to data size). This happens at large scale (TPC-H SF10) but not at moderate scale (1M rows). > > ## Conclusions and Recommendations > > ### The core finding > > ExternalSorter's per-batch sort + merge architecture on main is already well-suited for moderate-scale workloads (up to ~10M rows) at realistic batch sizes (8192). The bottleneck at this scale is **data movement** (physically copying string bytes), not comparison or encoding overhead. No intermediate representation change (Rows, StringView, concat) reduces the total bytes that must move. > > At large scale (TPC-H SF10, ~60M rows), **merge fan-in** becomes the bottleneck. Reducing fan-in from 7300 to 1800 runs gives a 15% speedup. But the cheapest way to reduce fan-in is to increase input batch size, not to coalesce inside the sort. > > ### For DataFusion > > **No ExternalSorter changes recommended at this time.** The current implementation is efficient. The encode-once architecture (branch `sort_rows`) successfully eliminates double RowConverter encoding in the merge, but the savings (~15% of pipeline time) are offset by the additional data movement cost of coalescing at moderate scale. > > When `lexsort_radix` lands in arrow-rs (#9683), it provides 1.5-2.5x faster sorting on Rows at 32K+ rows. The encode-once architecture on `sort_rows` provides the infrastructure to adopt it (incremental `RowConverter::append`, `SortedRun` with permutation, `SortedRunStream` for merge handoff). The radix sort may shift the break-even point for coalescing to a lower scale, making the approach viable for moderate workloads. This should be revisited when the kernel is available. > > ### For Comet > > **Increase batch size rather than redesign the sort.** Comet sees performance wins at 32768 batch size (up from 8192 default). At 60M rows, this reduces merge fan-in from 7300 to 1800 — the same improvement that es4's coalescing achieved, but without any sort-internal overhead. > > The simplest approach: have Comet's shuffle writer produce larger batches when feeding a CometSort. The planner knows the plan topology and can set batch size based on the downstream operator. Sort wants large batches (fewer runs); other operators may prefer smaller batches for pipelining. > > Alternatively: increase Comet's default batch size globally to 32768 and validate that nothing regresses. The sort and merge both handle 32K batches efficiently. > > A Comet-specific sort operator may still be worthwhile for other reasons (working directly with Spark's row format, spilling in shuffle format, JNI-aware memory), but the fan-in problem is solvable without one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
