andygrove opened a new pull request, #1598:
URL: https://github.com/apache/datafusion-ballista/pull/1598

   # Which issue does this PR close?
   
   Closes #1432.
   
   # Rationale for this change
   
   The current sort-based shuffle writer calls `BatchPartitioner::partition()` 
on every input batch, which internally `take_arrays`-slices each input into one 
sub-batch per output partition and stores those slices in per-partition 
`PartitionBuffer`s. With `N` output partitions, an 8192-row input batch becomes 
up to `N` sub-batches averaging `8192 / N` rows. At `N = 200`, that is ~41 rows 
per buffered batch.
   
   These tiny batches are appended individually to `PartitionBuffer`, spilled 
individually to IPC, and written individually to the final output file. Each 
batch carries Arrow IPC framing overhead, hurts compression ratios, and 
inflates per-batch metadata cost.
   
   This PR adopts the approach used by Apache DataFusion Comet's sort-based 
shuffle: defer materialization. Hold whole input batches plus per-partition 
row-index lists, and only call `arrow::compute::interleave_record_batch` at 
spill or final-write time, producing output batches sized up to the configured 
`batch_size`.
   
   In the same change, the per-task fixed-byte memory budget is replaced with 
`MemoryReservation` against the runtime memory pool, so the sort-shuffle writer 
participates in the same memory accounting as every other DataFusion operator.
   
   # What changes are included in this PR?
   
   **Core writer rewrite** (`ballista/core/src/execution_plans/sort_shuffle/`):
   
   - New `BufferedBatches` type holds whole input `RecordBatch`es plus 
per-partition `Vec<(batch_idx, row_idx)>` index lists. Replaces 
`PartitionBuffer`.
   - New `PartitionedBatchIterator` materializes per-partition rows into 
`batch_size`-shaped output batches via `interleave_record_batch`. This is the 
only path that copies row data, and it happens once per output batch (not once 
per input × output pair).
   - New private `compute_partition_indices` helper inlines DataFusion's hash 
logic (`evaluate_expressions_to_arrays` + `create_hashes` + 
`REPARTITION_RANDOM_STATE`). A unit test cross-checks per-row partition 
assignments against `BatchPartitioner::Hash` to catch hash-semantics drift if 
DataFusion's `REPARTITION_RANDOM_STATE` ever changes.
   - Spill is now all-or-nothing, triggered by `MemoryReservation::try_grow()` 
failure against `RuntimeEnv::memory_pool` 
(`MemoryConsumer::with_can_spill(true)`). The previous greedy "spill the 
largest buffer" against a fixed-byte threshold is removed.
   - `SpillManager::spill` becomes per-batch (`(partition_id, &RecordBatch) -> 
Result<u64>`); schema moves to the constructor.
   - `finalize_output` materializes the in-memory remainder for each partition 
through `PartitionedBatchIterator`. The Arrow IPC `FileWriter` + 
`data.arrow.index` output format is unchanged — the reader is untouched.
   - A new memory-pressure round-trip test exercises spilling under a 
tightly-bounded `FairSpillPool` and verifies (a) `spill_count > 0`, (b) 
round-trip key set matches the input, (c) `pool.reserved() == 0` after 
finalization.
   
   **Configuration cleanup**:
   
   - `SortShuffleConfig` shrinks to `enabled`, `compression`, `batch_size` 
(drops `buffer_size`, `memory_limit`, `spill_threshold`, 
`spill_memory_threshold()`).
   - Three corresponding `BALLISTA_SHUFFLE_SORT_BASED_*` session keys and their 
`BallistaConfig` accessors are removed.
   - The matching three fields on the `SortShuffleWriterExecNode` protobuf 
message are dropped (field number `8` for `batch_size` is preserved; 
gap-leaving on the wire is fine).
   
   **Standalone benchmark binary** (`benchmarks/src/bin/shuffle_bench.rs`):
   
   - Replaced with a Comet-style runner that streams from real Parquet input 
via `read_parquet`. Adds `--writer hash|sort` to select either shuffle writer, 
`--memory-limit` wired through `RuntimeEnvBuilder::with_memory_limit` (now 
actually drives sort-shuffle spilling), `--concurrent-tasks` for parallelism 
simulation, warmup + iteration loop with detailed metrics dump.
   - Switched from `structopt` (deprecated) to `clap`.
   - The criterion bench at `benchmarks/benches/sort_shuffle.rs` is also 
updated to wire `--memory-limit` through `RuntimeEnvBuilder` so the 
`bench_with_spill` group actually exercises spilling under the new model.
   
   # Are there any user-facing changes?
   
   Yes. **Breaking changes**:
   
   - Three session-config keys are removed: 
`ballista.shuffle.sort_based.buffer_size`, 
`ballista.shuffle.sort_based.memory_limit`, 
`ballista.shuffle.sort_based.spill_threshold`. Spill is now governed by the 
runtime memory pool (`RuntimeEnv::memory_pool`) — set a memory limit via 
`RuntimeEnvBuilder::with_memory_limit` instead.
   - `SortShuffleConfig::new` is now a 3-arg constructor (`enabled`, 
`compression`, `batch_size`).
   - `SortShuffleWriterExecNode` protobuf message drops fields 5/6/7 
(`buffer_size`, `memory_limit`, `spill_threshold`).
   - `PartitionBuffer` is replaced by the new `BufferedBatches`.
   - `SpillManager::new` takes a `SchemaRef`; `SpillManager::spill` takes a 
single `&RecordBatch`.
   
   The shuffle output file format is unchanged — readers do not need to change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to