andygrove opened a new pull request, #1598: URL: https://github.com/apache/datafusion-ballista/pull/1598
# Which issue does this PR close? Closes #1432. # Rationale for this change The current sort-based shuffle writer calls `BatchPartitioner::partition()` on every input batch, which internally `take_arrays`-slices each input into one sub-batch per output partition and stores those slices in per-partition `PartitionBuffer`s. With `N` output partitions, an 8192-row input batch becomes up to `N` sub-batches averaging `8192 / N` rows. At `N = 200`, that is ~41 rows per buffered batch. These tiny batches are appended individually to `PartitionBuffer`, spilled individually to IPC, and written individually to the final output file. Each batch carries Arrow IPC framing overhead, hurts compression ratios, and inflates per-batch metadata cost. This PR adopts the approach used by Apache DataFusion Comet's sort-based shuffle: defer materialization. Hold whole input batches plus per-partition row-index lists, and only call `arrow::compute::interleave_record_batch` at spill or final-write time, producing output batches sized up to the configured `batch_size`. In the same change, the per-task fixed-byte memory budget is replaced with `MemoryReservation` against the runtime memory pool, so the sort-shuffle writer participates in the same memory accounting as every other DataFusion operator. # What changes are included in this PR? **Core writer rewrite** (`ballista/core/src/execution_plans/sort_shuffle/`): - New `BufferedBatches` type holds whole input `RecordBatch`es plus per-partition `Vec<(batch_idx, row_idx)>` index lists. Replaces `PartitionBuffer`. - New `PartitionedBatchIterator` materializes per-partition rows into `batch_size`-shaped output batches via `interleave_record_batch`. This is the only path that copies row data, and it happens once per output batch (not once per input × output pair). - New private `compute_partition_indices` helper inlines DataFusion's hash logic (`evaluate_expressions_to_arrays` + `create_hashes` + `REPARTITION_RANDOM_STATE`). A unit test cross-checks per-row partition assignments against `BatchPartitioner::Hash` to catch hash-semantics drift if DataFusion's `REPARTITION_RANDOM_STATE` ever changes. - Spill is now all-or-nothing, triggered by `MemoryReservation::try_grow()` failure against `RuntimeEnv::memory_pool` (`MemoryConsumer::with_can_spill(true)`). The previous greedy "spill the largest buffer" against a fixed-byte threshold is removed. - `SpillManager::spill` becomes per-batch (`(partition_id, &RecordBatch) -> Result<u64>`); schema moves to the constructor. - `finalize_output` materializes the in-memory remainder for each partition through `PartitionedBatchIterator`. The Arrow IPC `FileWriter` + `data.arrow.index` output format is unchanged — the reader is untouched. - A new memory-pressure round-trip test exercises spilling under a tightly-bounded `FairSpillPool` and verifies (a) `spill_count > 0`, (b) round-trip key set matches the input, (c) `pool.reserved() == 0` after finalization. **Configuration cleanup**: - `SortShuffleConfig` shrinks to `enabled`, `compression`, `batch_size` (drops `buffer_size`, `memory_limit`, `spill_threshold`, `spill_memory_threshold()`). - Three corresponding `BALLISTA_SHUFFLE_SORT_BASED_*` session keys and their `BallistaConfig` accessors are removed. - The matching three fields on the `SortShuffleWriterExecNode` protobuf message are dropped (field number `8` for `batch_size` is preserved; gap-leaving on the wire is fine). **Standalone benchmark binary** (`benchmarks/src/bin/shuffle_bench.rs`): - Replaced with a Comet-style runner that streams from real Parquet input via `read_parquet`. Adds `--writer hash|sort` to select either shuffle writer, `--memory-limit` wired through `RuntimeEnvBuilder::with_memory_limit` (now actually drives sort-shuffle spilling), `--concurrent-tasks` for parallelism simulation, warmup + iteration loop with detailed metrics dump. - Switched from `structopt` (deprecated) to `clap`. - The criterion bench at `benchmarks/benches/sort_shuffle.rs` is also updated to wire `--memory-limit` through `RuntimeEnvBuilder` so the `bench_with_spill` group actually exercises spilling under the new model. # Are there any user-facing changes? Yes. **Breaking changes**: - Three session-config keys are removed: `ballista.shuffle.sort_based.buffer_size`, `ballista.shuffle.sort_based.memory_limit`, `ballista.shuffle.sort_based.spill_threshold`. Spill is now governed by the runtime memory pool (`RuntimeEnv::memory_pool`) — set a memory limit via `RuntimeEnvBuilder::with_memory_limit` instead. - `SortShuffleConfig::new` is now a 3-arg constructor (`enabled`, `compression`, `batch_size`). - `SortShuffleWriterExecNode` protobuf message drops fields 5/6/7 (`buffer_size`, `memory_limit`, `spill_threshold`). - `PartitionBuffer` is replaced by the new `BufferedBatches`. - `SpillManager::new` takes a `SchemaRef`; `SpillManager::spill` takes a single `&RecordBatch`. The shuffle output file format is unchanged — readers do not need to change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
