hcrosse opened a new pull request, #1537:
URL: https://github.com/apache/datafusion-ballista/pull/1537

   ## Which issue does this PR close?
   
   Closes #1387.
   
   ## Rationale
   
   The shuffle writer performs synchronous `std::fs` I/O inside async task 
contexts. Under concurrent shuffle workloads, this blocks tokio worker threads 
and degrades runtime responsiveness for co-located async tasks.
   
   ## What changes are included in this PR?
   
   Both the hash repartition path in `shuffle_writer.rs` and the no-repartition 
path in `utils::write_stream_to_disk` now use a bounded `mpsc::channel(2)` to 
bridge the async record-batch stream to a `spawn_blocking` task that owns all 
file I/O. This is the same pattern already used in `flight_service.rs`.
   
   - `shuffle_writer.rs`: hash repartition path rewritten with bounded-channel 
+ `spawn_blocking`, `create_dir_all` converted to `tokio::fs`, removed 
double-counting of `write_time` in the no-repartition path, two write-failure 
regression tests added
   - `utils.rs`: `write_stream_to_disk` rewritten with bounded-channel + 
`spawn_blocking`
   - `ballista/core/Cargo.toml`: added `"fs"` feature to tokio dependency
   - `shuffle_bench.rs`: added `--concurrency` flag for measuring concurrent 
partition execution, modeling how the Ballista executor dispatches tasks via 
`Semaphore(task_slots)`
   
   ## Benchmark results
   
   All numbers from `shuffle_bench` with 3 iterations averaged, targeting local 
tmpfs. Format is before -> after (delta).
   
   Default configuration (8 input partitions, 100 output partitions):
   
   | Rows | Concurrency 1 | Concurrency 4 | Concurrency 8 |
   |------|---------------|---------------|---------------|
   | 1,000,000 | - | 114 -> 97 (-14.4%) | 98 -> 93 (-4.7%) |
   | 5,000,000 | 648 -> 650 (+0.4%) | 284 -> 272 (-4.4%) | 291 -> 293 (+0.9%) |
   | 10,000,000 | - | 498 -> 487 (-2.4%) | - |
   
   Output partition sensitivity (5,000,000 rows, 8 input partitions):
   
   | Output Part. | Concurrency 4 | Concurrency 8 |
   |--------------|---------------|---------------|
   | 50 | 155 -> 164 (+6.3%) | - |
   | 100 | 284 -> 272 (-4.4%) | 291 -> 293 (+0.9%) |
   | 200 | 486 -> 491 (+1.0%) | 475 -> 473 (-0.4%) |
   
   Input partition sensitivity (5,000,000 rows, concurrency 4, 100 output 
partitions):
   
   | Input Part. | Before -> After |
   |-------------|-----------------|
   | 4 | 238 -> 241 (+1.2%) |
   | 8 | 284 -> 272 (-4.4%) |
   
   Throughput results are mixed, which is expected when the I/O target is local 
tmpfs with near-zero latency. In this scenario the channel indirection adds 
overhead that roughly offsets the benefit of freeing tokio workers.
   
   The benefit shows most clearly at moderate concurrency (c=4) with 8 input 
partitions, where the tokio threadpool has enough in-flight tasks to take 
advantage of freed workers. At c=1 there is no contention so no benefit. At 
c=8, I/O bandwidth saturates and the channel overhead roughly cancels out the 
worker-freeing benefit. The slight regression at p=50 reflects the smaller 
per-batch partition cost making the channel overhead proportionally larger.
   
   The primary benefit is runtime health rather than raw throughput: keeping 
tokio workers unblocked prevents latency spikes in co-located async tasks. A 
separate runtime-probe microbenchmark confirmed that inline sync I/O causes 3-8 
second probe latencies vs 2-8 ms with `spawn_blocking`, even when aggregate 
throughput is similar.
   
   ## Future work
   
   `BatchPartitioner` currently exposes only a synchronous iterator-based API, 
so both the partitioning and the I/O must happen together inside 
`spawn_blocking`. An async-friendly partitioner API in DataFusion would let 
Ballista move the partitioning step back to the async side, narrowing the 
`spawn_blocking` scope to just the `StreamWriter` I/O. This would be a 
DataFusion upstream change; I plan to file an issue there as a follow-up.
   
   ## Are these changes tested?
   
   All existing `shuffle_writer` tests pass. Two new regression tests verify 
that errors propagate correctly through the bounded-channel for both the 
no-repartition and hash repartition paths. The `shuffle_bench` binary now 
accepts a `--concurrency` flag for validating concurrent execution behavior.
   
   ## Are there any user-facing changes?
   
   No. This is an internal change to the shuffle writer execution path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to