hcrosse opened a new pull request, #1537: URL: https://github.com/apache/datafusion-ballista/pull/1537
## Which issue does this PR close? Closes #1387. ## Rationale The shuffle writer performs synchronous `std::fs` I/O inside async task contexts. Under concurrent shuffle workloads, this blocks tokio worker threads and degrades runtime responsiveness for co-located async tasks. ## What changes are included in this PR? Both the hash repartition path in `shuffle_writer.rs` and the no-repartition path in `utils::write_stream_to_disk` now use a bounded `mpsc::channel(2)` to bridge the async record-batch stream to a `spawn_blocking` task that owns all file I/O. This is the same pattern already used in `flight_service.rs`. - `shuffle_writer.rs`: hash repartition path rewritten with bounded-channel + `spawn_blocking`, `create_dir_all` converted to `tokio::fs`, removed double-counting of `write_time` in the no-repartition path, two write-failure regression tests added - `utils.rs`: `write_stream_to_disk` rewritten with bounded-channel + `spawn_blocking` - `ballista/core/Cargo.toml`: added `"fs"` feature to tokio dependency - `shuffle_bench.rs`: added `--concurrency` flag for measuring concurrent partition execution, modeling how the Ballista executor dispatches tasks via `Semaphore(task_slots)` ## Benchmark results All numbers from `shuffle_bench` with 3 iterations averaged, targeting local tmpfs. Format is before -> after (delta). Default configuration (8 input partitions, 100 output partitions): | Rows | Concurrency 1 | Concurrency 4 | Concurrency 8 | |------|---------------|---------------|---------------| | 1,000,000 | - | 114 -> 97 (-14.4%) | 98 -> 93 (-4.7%) | | 5,000,000 | 648 -> 650 (+0.4%) | 284 -> 272 (-4.4%) | 291 -> 293 (+0.9%) | | 10,000,000 | - | 498 -> 487 (-2.4%) | - | Output partition sensitivity (5,000,000 rows, 8 input partitions): | Output Part. | Concurrency 4 | Concurrency 8 | |--------------|---------------|---------------| | 50 | 155 -> 164 (+6.3%) | - | | 100 | 284 -> 272 (-4.4%) | 291 -> 293 (+0.9%) | | 200 | 486 -> 491 (+1.0%) | 475 -> 473 (-0.4%) | Input partition sensitivity (5,000,000 rows, concurrency 4, 100 output partitions): | Input Part. | Before -> After | |-------------|-----------------| | 4 | 238 -> 241 (+1.2%) | | 8 | 284 -> 272 (-4.4%) | Throughput results are mixed, which is expected when the I/O target is local tmpfs with near-zero latency. In this scenario the channel indirection adds overhead that roughly offsets the benefit of freeing tokio workers. The benefit shows most clearly at moderate concurrency (c=4) with 8 input partitions, where the tokio threadpool has enough in-flight tasks to take advantage of freed workers. At c=1 there is no contention so no benefit. At c=8, I/O bandwidth saturates and the channel overhead roughly cancels out the worker-freeing benefit. The slight regression at p=50 reflects the smaller per-batch partition cost making the channel overhead proportionally larger. The primary benefit is runtime health rather than raw throughput: keeping tokio workers unblocked prevents latency spikes in co-located async tasks. A separate runtime-probe microbenchmark confirmed that inline sync I/O causes 3-8 second probe latencies vs 2-8 ms with `spawn_blocking`, even when aggregate throughput is similar. ## Future work `BatchPartitioner` currently exposes only a synchronous iterator-based API, so both the partitioning and the I/O must happen together inside `spawn_blocking`. An async-friendly partitioner API in DataFusion would let Ballista move the partitioning step back to the async side, narrowing the `spawn_blocking` scope to just the `StreamWriter` I/O. This would be a DataFusion upstream change; I plan to file an issue there as a follow-up. ## Are these changes tested? All existing `shuffle_writer` tests pass. Two new regression tests verify that errors propagate correctly through the bounded-channel for both the no-repartition and hash repartition paths. The `shuffle_bench` binary now accepts a `--concurrency` flag for validating concurrent execution behavior. ## Are there any user-facing changes? No. This is an internal change to the shuffle writer execution path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
