andygrove opened a new issue, #1643: URL: https://github.com/apache/datafusion-ballista/issues/1643
## Summary When running TPC-H Q2 at SF1000 with 32 executors (8 concurrent tasks each, 64 partitions), the query starts with all executors busy but quickly drops to a single busy executor for a significant portion of the query. Root cause is hash-partitioning intermediate shuffle data on low-cardinality columns (`n_regionkey` with 5 distinct values, `n_nationkey` with 25 distinct values) into 64 buckets. ## Environment - Ballista with 32 executors, 8 concurrent tasks each - 64 hash partitions - TPC-H SF1000, Parquet/zstd format - PushStaged scheduling policy ## Analysis ### Stage execution flow The query is broken into stages 3–17. The critical path is: | Stage | Operation | Shuffle Key | Duration | |-------|-----------|-------------|----------| | 13 | Scan `partsupp` (800M rows) | `Hash(ps_suppkey, 64)` | ~4s | | 3 | Scan `supplier` (10M rows) | `Hash(s_suppkey, 64)` | <1s | | 6 | HashJoin `part ⋈ partsupp` | `Hash(ps_suppkey, 64)` | ~2s | | 14 | Join supplier+partsupp | `Hash(s_nationkey, 64)` | ~2s | | **15** | **HashJoin `nation ⋈ (supplier+partsupp)`** | **`Hash(n_regionkey, 64)`** | **~9s** | | **16** | **HashJoin `region` + partial agg** | `Hash(ps_partkey, 64)` | ~3s | | 17 | Final agg `min(ps_supplycost)` | `Hash(ps_partkey, 64)` | ~1s | ### The bottleneck: Stage 15 data skew Stage 15 joins `nation` (25 rows) with the supplier+partsupp result (800M rows) on `n_nationkey`, then repartitions the output by `Hash(n_regionkey, 64)`. Since `n_regionkey` has only **5 distinct values** mapped into 64 hash buckets, only 5 of 64 partitions receive any data. The rest are empty. Observed row distribution on one executor: | Partition | Input Rows | Write Time | |-----------|-----------|------------| | 56, 57, 59, 60, 62 | 0 | ~100µs each | | 58 | 31.96M | 2.10s | | 61 | 63.93M | 5.53s | The wall-clock skew ratio is **8670x** (8.7s max vs ~0s min). ### Stage 16 compounds the problem Stage 16 joins `region` (1 row after filtering, e.g., `r_name = 'EUROPE'`) with stage 15 output on `r_regionkey`. Only 1 of the 5 non-empty regionkey partitions produces output, but all partitions still read their full shuffle data to perform the hash join — spending seconds reading millions of rows only to discard them. ### 27-second idle gap After this executor finishes its stage 15/16 partitions at `15:48:25`, it sits completely idle until `15:48:52` (27 seconds) waiting for the straggler executor(s) that received the heaviest `n_regionkey` partitions. This alone accounts for **51% of the total executor wall time**. ``` Total wall time: 51.8s Actual work: 25.2s Idle (blocked): 26.5s Utilization: 49% ``` ### Shuffle write throughput is fine The shuffle write mechanism itself performs at a consistent 10–16 Mrows/s across the heavy stages. The problem is not shuffle I/O — it's the skewed partition assignment. ### Four 23.87 GB shuffles of the same 800M-row dataset The `ShuffleReaderExec` stats show four distinct 800M-row / 23.87 GB shuffle reads flowing through the query plan. The `partsupp` and `supplier+partsupp` intermediate data is reshuffled multiple times through the join tree. ## Suggested improvements 1. **Broadcast join for small dimension tables**: `nation` (25 rows) and `region` (5 rows) should be broadcast rather than hash-partitioned. This eliminates the skewed `Hash(n_regionkey, 64)` shuffle entirely. 2. **Push region filter earlier**: If the region filter (`r_name = 'EUROPE'`) is pushed down before the nation join, only ~5 nations participate, reducing intermediate data by ~4/5. 3. **Detect low-cardinality partition keys**: The planner could check column statistics (NDV) before choosing a hash partition key. When NDV << partition count, it should prefer an alternative key or switch to broadcast. 4. **Reduce redundant reshuffles**: The 800M-row supplier+partsupp result is shuffled 4 times through the join tree. Reordering joins or combining stages could reduce this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
