andygrove opened a new issue, #1643:
URL: https://github.com/apache/datafusion-ballista/issues/1643

   ## Summary
   
   When running TPC-H Q2 at SF1000 with 32 executors (8 concurrent tasks each, 
64 partitions), the query starts with all executors busy but quickly drops to a 
single busy executor for a significant portion of the query. Root cause is 
hash-partitioning intermediate shuffle data on low-cardinality columns 
(`n_regionkey` with 5 distinct values, `n_nationkey` with 25 distinct values) 
into 64 buckets.
   
   ## Environment
   
   - Ballista with 32 executors, 8 concurrent tasks each
   - 64 hash partitions
   - TPC-H SF1000, Parquet/zstd format
   - PushStaged scheduling policy
   
   ## Analysis
   
   ### Stage execution flow
   
   The query is broken into stages 3–17. The critical path is:
   
   | Stage | Operation | Shuffle Key | Duration |
   |-------|-----------|-------------|----------|
   | 13 | Scan `partsupp` (800M rows) | `Hash(ps_suppkey, 64)` | ~4s |
   | 3 | Scan `supplier` (10M rows) | `Hash(s_suppkey, 64)` | <1s |
   | 6 | HashJoin `part ⋈ partsupp` | `Hash(ps_suppkey, 64)` | ~2s |
   | 14 | Join supplier+partsupp | `Hash(s_nationkey, 64)` | ~2s |
   | **15** | **HashJoin `nation ⋈ (supplier+partsupp)`** | 
**`Hash(n_regionkey, 64)`** | **~9s** |
   | **16** | **HashJoin `region` + partial agg** | `Hash(ps_partkey, 64)` | 
~3s |
   | 17 | Final agg `min(ps_supplycost)` | `Hash(ps_partkey, 64)` | ~1s |
   
   ### The bottleneck: Stage 15 data skew
   
   Stage 15 joins `nation` (25 rows) with the supplier+partsupp result (800M 
rows) on `n_nationkey`, then repartitions the output by `Hash(n_regionkey, 64)`.
   
   Since `n_regionkey` has only **5 distinct values** mapped into 64 hash 
buckets, only 5 of 64 partitions receive any data. The rest are empty. Observed 
row distribution on one executor:
   
   | Partition | Input Rows | Write Time |
   |-----------|-----------|------------|
   | 56, 57, 59, 60, 62 | 0 | ~100µs each |
   | 58 | 31.96M | 2.10s |
   | 61 | 63.93M | 5.53s |
   
   The wall-clock skew ratio is **8670x** (8.7s max vs ~0s min).
   
   ### Stage 16 compounds the problem
   
   Stage 16 joins `region` (1 row after filtering, e.g., `r_name = 'EUROPE'`) 
with stage 15 output on `r_regionkey`. Only 1 of the 5 non-empty regionkey 
partitions produces output, but all partitions still read their full shuffle 
data to perform the hash join — spending seconds reading millions of rows only 
to discard them.
   
   ### 27-second idle gap
   
   After this executor finishes its stage 15/16 partitions at `15:48:25`, it 
sits completely idle until `15:48:52` (27 seconds) waiting for the straggler 
executor(s) that received the heaviest `n_regionkey` partitions. This alone 
accounts for **51% of the total executor wall time**.
   
   ```
   Total wall time:   51.8s
   Actual work:       25.2s
   Idle (blocked):    26.5s
   Utilization:       49%
   ```
   
   ### Shuffle write throughput is fine
   
   The shuffle write mechanism itself performs at a consistent 10–16 Mrows/s 
across the heavy stages. The problem is not shuffle I/O — it's the skewed 
partition assignment.
   
   ### Four 23.87 GB shuffles of the same 800M-row dataset
   
   The `ShuffleReaderExec` stats show four distinct 800M-row / 23.87 GB shuffle 
reads flowing through the query plan. The `partsupp` and `supplier+partsupp` 
intermediate data is reshuffled multiple times through the join tree.
   
   ## Suggested improvements
   
   1. **Broadcast join for small dimension tables**: `nation` (25 rows) and 
`region` (5 rows) should be broadcast rather than hash-partitioned. This 
eliminates the skewed `Hash(n_regionkey, 64)` shuffle entirely.
   
   2. **Push region filter earlier**: If the region filter (`r_name = 
'EUROPE'`) is pushed down before the nation join, only ~5 nations participate, 
reducing intermediate data by ~4/5.
   
   3. **Detect low-cardinality partition keys**: The planner could check column 
statistics (NDV) before choosing a hash partition key. When NDV << partition 
count, it should prefer an alternative key or switch to broadcast.
   
   4. **Reduce redundant reshuffles**: The 800M-row supplier+partsupp result is 
shuffled 4 times through the join tree. Reordering joins or combining stages 
could reduce this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to