jja725 opened a new issue, #1539: URL: https://github.com/apache/datafusion-ballista/issues/1539
## Is your feature request related to a problem or challenge? Ballista currently stores shuffle data on local executor disks and serves it via Arrow Flight between executors. This creates several limitations: - **Fault tolerance**: If an executor crashes, its shuffle data is lost and the entire job must restart from the beginning. - **Resource coupling**: Compute (executor CPU/memory) and shuffle storage are tightly coupled. Executors must remain alive for the full duration of the job to serve shuffle data to downstream stages. - **Scalability**: With M map tasks and N reduce tasks, executors must hold shuffle data for all downstream consumers, which limits independent scaling of compute vs. shuffle storage. - **Elasticity**: Executors cannot be reclaimed or scaled down between stages because they are still serving shuffle data. ## Describe the solution you'd like Add support for an **external Remote Shuffle Service (RSS)** as a pluggable shuffle storage and serving layer. Instead of writing shuffle data to local executor disks and serving it via Flight, executors would push shuffle data to a dedicated external service, and reduce tasks would pull from it. Popular open-source RSS implementations that could serve as integration targets: - **[Apache Celeborn](https://celeborn.apache.org/)** (formerly Alibaba RSS) — push-based, supports Spark/Flink/MR - **[Apache Uniffle](https://uniffle.apache.org/)** (formerly Tencent RSS) — supports multiple compute engines ### High-level design ideas 1. **Pluggable shuffle writer**: Introduce a `ShuffleWriter` trait/abstraction so the current local-disk writer and a new RSS writer can be swapped via configuration. 2. **RSS push on map completion**: After a map stage task finishes, shuffle blocks are pushed to the RSS instead of being left on local disk. 3. **RSS pull on reduce**: The shuffle reader fetches partitions from the RSS endpoint rather than connecting back to the producing executor via Flight. 4. **Scheduler awareness**: The scheduler should not need to track partition locations per-executor; instead, it queries the RSS for partition locations. ### Benefits - **Better fault tolerance**: Shuffle data survives executor failures. - **Executor elasticity**: Executors can be released after map stages complete. - **Decoupled scaling**: Shuffle storage and compute can scale independently. - **Reduced network pressure on executors**: Dedicated shuffle servers can handle merging and serving more efficiently. ## Alternatives considered - **Issue #1151** proposes streaming push-based shuffle (in-memory, direct executor-to-executor). This is complementary but different — it targets low-latency queries where stages overlap, whereas RSS targets fault tolerance and resource elasticity for batch workloads. - Continuing to improve the existing local-disk + Flight approach (see #1319) is useful but does not address the fault tolerance or elasticity gaps. ## Additional context - Apache Spark has supported external shuffle service since Spark 1.2 and introduced push-based shuffle in Spark 3.2 (SPARK-30602). - Flink supports pluggable shuffle service via `ShuffleServiceFactory`. - Presto/Trino have ongoing work on native spooling shuffle. Is there community interest in this direction? Happy to help with design, prototyping, or integration work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
