andygrove opened a new pull request, #1636: URL: https://github.com/apache/datafusion-ballista/pull/1636
# Which issue does this PR close? Closes #. # Rationale for this change The sort shuffle writer's spill decision is currently gated by `MemoryReservation::try_grow()` against the runtime `MemoryPool`. When the executor is started without `--memory-pool-size`, the pool is unbounded and `try_grow` never fails, so the writer never spills and just keeps buffering input batches in memory until the input stream finishes. On TPC-H Q3 against SF100 with `--partitions 2`, each lineitem stage-4 writer task buffers ~20M rows × ~80 bytes ≈ 1.6 GB. With 8 concurrent task slots that's ~13 GB peak just for stage 4, plus stages 1, 2, 3 running in parallel. Measured executor RSS climbed to ~24.6 GB, the OS started swapping/compressing pages, and Q3 went from ~13s (hash shuffle) to ~49s (sort shuffle). Higher partition counts hide the issue because each writer's per-partition buffer ends up smaller, but the regression is severe at low partition counts. Since the v53 default flipped to sort shuffle (#1623), users hitting this case have to know to set `--memory-pool-size` to get reasonable behaviour. # What changes are included in this PR? - New config `ballista.shuffle.sort_based.memory_limit_per_task_bytes` (default 256 MB). - `SortShuffleConfig` carries the limit, plumbed from `BallistaConfig` through the planner. - `SortShuffleWriterExec` tracks its own buffered-bytes counter and spills when the counter crosses the configured threshold, independently of the runtime `MemoryPool`. The `MemoryReservation` registration is kept (best-effort `try_grow`) so the pool still sees this writer as a memory citizen for visibility purposes. - Existing spill round-trip tests rewired to drive spilling via the new per-task budget rather than via a tight `FairSpillPool` size; pool size in tests is now generous so the post-test `pool.reserved() == 0` assertion still validates that the writer releases its reservation. - Total worst-case sort shuffle memory per executor is approximately `concurrent_tasks * memory_limit_per_task_bytes`. At the default 256 MB × 8 slots that's ~2 GB, which is safe on a laptop running multiple executors and still leaves the runtime pool free for other operators. # Are there any user-facing changes? A new configuration key `ballista.shuffle.sort_based.memory_limit_per_task_bytes` is added with a default of 256 MB. Users running queries that previously fit comfortably in unbounded memory may now see spilling — set this higher if avoiding spills matters more than bounding memory. ## Verification TPC-H Q3 against `/opt/tpch/sf100`, executor started with `--concurrent-tasks 8` and **no** `--memory-pool-size`: | `--partitions` | Sort before | Sort after | Hash (unchanged) | | --- | --- | --- | --- | | 2 | 49.1s (~24 GB RSS) | 21.8s (~4 GB RSS) | 13.4s | | 4 | 14.1s | 9.3s | 8.5s | | 8 | 7.7s | 7.9s | 7.3s | | 16 | 6.3s | 8.0s | 11.0s | The catastrophic regression at low partition counts is gone; sort shuffle now stays within a few seconds of hash shuffle at 2 and 4 partitions and continues to win at higher counts. `cargo test -p ballista-core --lib execution_plans::sort_shuffle::` passes (29/29) and `cargo test -p ballista --test sort_shuffle` passes (47/47). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
