andygrove opened a new pull request, #1636:
URL: https://github.com/apache/datafusion-ballista/pull/1636

   # Which issue does this PR close?
   
   Closes #.
   
   # Rationale for this change
   
   The sort shuffle writer's spill decision is currently gated by 
`MemoryReservation::try_grow()` against the runtime `MemoryPool`. When the 
executor is started without `--memory-pool-size`, the pool is unbounded and 
`try_grow` never fails, so the writer never spills and just keeps buffering 
input batches in memory until the input stream finishes.
   
   On TPC-H Q3 against SF100 with `--partitions 2`, each lineitem stage-4 
writer task buffers ~20M rows × ~80 bytes ≈ 1.6 GB. With 8 concurrent task 
slots that's ~13 GB peak just for stage 4, plus stages 1, 2, 3 running in 
parallel. Measured executor RSS climbed to ~24.6 GB, the OS started 
swapping/compressing pages, and Q3 went from ~13s (hash shuffle) to ~49s (sort 
shuffle). Higher partition counts hide the issue because each writer's 
per-partition buffer ends up smaller, but the regression is severe at low 
partition counts.
   
   Since the v53 default flipped to sort shuffle (#1623), users hitting this 
case have to know to set `--memory-pool-size` to get reasonable behaviour.
   
   # What changes are included in this PR?
   
   - New config `ballista.shuffle.sort_based.memory_limit_per_task_bytes` 
(default 256 MB).
   - `SortShuffleConfig` carries the limit, plumbed from `BallistaConfig` 
through the planner.
   - `SortShuffleWriterExec` tracks its own buffered-bytes counter and spills 
when the counter crosses the configured threshold, independently of the runtime 
`MemoryPool`. The `MemoryReservation` registration is kept (best-effort 
`try_grow`) so the pool still sees this writer as a memory citizen for 
visibility purposes.
   - Existing spill round-trip tests rewired to drive spilling via the new 
per-task budget rather than via a tight `FairSpillPool` size; pool size in 
tests is now generous so the post-test `pool.reserved() == 0` assertion still 
validates that the writer releases its reservation.
   - Total worst-case sort shuffle memory per executor is approximately 
`concurrent_tasks * memory_limit_per_task_bytes`. At the default 256 MB × 8 
slots that's ~2 GB, which is safe on a laptop running multiple executors and 
still leaves the runtime pool free for other operators.
   
   # Are there any user-facing changes?
   
   A new configuration key 
`ballista.shuffle.sort_based.memory_limit_per_task_bytes` is added with a 
default of 256 MB. Users running queries that previously fit comfortably in 
unbounded memory may now see spilling — set this higher if avoiding spills 
matters more than bounding memory.
   
   ## Verification
   
   TPC-H Q3 against `/opt/tpch/sf100`, executor started with 
`--concurrent-tasks 8` and **no** `--memory-pool-size`:
   
   | `--partitions` | Sort before | Sort after | Hash (unchanged) |
   | --- | --- | --- | --- |
   | 2 | 49.1s (~24 GB RSS) | 21.8s (~4 GB RSS) | 13.4s |
   | 4 | 14.1s | 9.3s | 8.5s |
   | 8 | 7.7s | 7.9s | 7.3s |
   | 16 | 6.3s | 8.0s | 11.0s |
   
   The catastrophic regression at low partition counts is gone; sort shuffle 
now stays within a few seconds of hash shuffle at 2 and 4 partitions and 
continues to win at higher counts.
   
   `cargo test -p ballista-core --lib execution_plans::sort_shuffle::` passes 
(29/29) and `cargo test -p ballista --test sort_shuffle` passes (47/47).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to