andygrove opened a new issue, #1907:
URL: https://github.com/apache/datafusion-ballista/issues/1907

   ## Describe the bug
   
   After upgrading to DataFusion 54 (PR #1906), distributed TPC-H execution is 
dramatically slower. CI's TPC-H SF10 job went from ~12 min on `main` to ~52 min 
and then failed with `No space left on device`. Example per-query (CI, SF10, 
`prefer_hash_join=false`):
   
   | query | main (DF53) | DF54 branch |
   |---|---:|---:|
   | q1 | 2.95 s | 40 s |
   | q2 | 3.38 s | 600 s |
   | q3 | 5.48 s | 143 s |
   | q5 | 15.8 s | died (disk full) |
   
   This reproduces locally and is well-characterized below. Single-node 
DataFusion 54 is fast, so the regression is in Ballista's distributed execution 
path.
   
   ## To Reproduce
   
   SF10 parquet, 1 scheduler + 1 executor (`-c 8`), release build:
   
   ```
   tpch benchmark ballista --query 1 --partitions 16 --iterations 1 -c 
datafusion.optimizer.prefer_hash_join=false
   ```
   
   ## The core finding: tasks queued behind the first wave run ~8× slower
   
   With an executor of N task slots, the **first N tasks (wave 1) run fast; 
every subsequent wave runs ~8× slower for identical work.**
   
   | executor slots | partitions | waves | q1 time |
   |---:|---:|---:|---:|
   | 8 | 8 | 1 | 0.50 s |
   | 8 | 16 | 2 | 4.2 s |
   | 8 | 24 | 3 | 8.0 s |
   | 16 | 16 | 1 | 0.30 s |
   | 16 | 32 | 2 | 4.0 s |
   
   - Single-node DataFusion 54 (same data, 16 target partitions): **0.21 s**.
   - DataFusion 53 (same Ballista commit), distributed, q1 @16part/8slots: 
**0.56 s** (no wave penalty).
   - It resets per query and is purely timing/wave-dependent, not 
partition-index-dependent (with 16 slots, partitions 8–15 run in wave 1 and are 
fast).
   
   Per-task timing (q1 @16part/8slots): wave-1 tasks ~0.45 s each; wave-2 tasks 
start immediately after wave 1 (no dispatch gap) but take ~3.6 s each of 
**compute** time.
   
   ## What the slow tasks are doing
   
   A CPU sample during the slow wave shows all 8 worker threads **busy** (not 
parked) in:
   `execute_query_stage → GroupedHashAggregateStream → ProjectionStream → 
FilterExecStream → CooperativeStream → FileStream → ParquetRecordBatchReader → 
snappy/RLE decode`.
   
   So later-wave tasks are fully parallel and CPU-bound, but the **same parquet 
decode is ~8× slower** than in wave 1 for the same rows/task. The merged scan 
stage metrics also report very low `scan_efficiency_ratio` (~6%) and inflated 
`output_rows`/`files_opened` that scale with partition count (may be partly a 
metric-merge artifact, but the wall-clock slowdown is real).
   
   ## Ruled out
   
   All tested with config propagation working (after merging the fix for client 
`datafusion.*` overrides):
   
   - Stripping `CooperativeExec` from the executed plan — no effect.
   - `tokio` version — unchanged (1.52.3) between DF53 and DF54 branches.
   - Executor worker-thread priority (the `DedicatedExecutor` low priority) — 
no effect.
   - Worker thread count (×8) — no effect.
   - Sort-based vs hash-based shuffle — identical.
   - `datafusion.optimizer.repartition_file_scans=false` — no effect (didn't 
change the plan).
   - `datafusion.optimizer.repartition_file_min_size` huge (whole-file groups, 
no byte-range splitting) — made it **much worse** (51 s), so byte-range 
splitting is helping, not hurting.
   - `datafusion.execution.parquet.pushdown_filters=true` — no effect.
   - Sharing one `RuntimeEnv` across tasks (so the DF54 `FileMetadataCache` 
persists instead of being fresh per task) — no effect.
   
   Caveat: a test bypassing the `DedicatedExecutor` was inadvertently applied 
to the pull/poll path while the cluster ran PushStaged, so "runtime bypass" is 
still inconclusive.
   
   ## Question / context
   
   Has anyone seen DataFusion 54 parquet/scan behavior where tasks run after an 
initial batch decode the same data significantly slower on a shared process? 
(We hit DF54 parquet footer/metadata-caching changes when upgrading Comet.) Any 
pointers on what changed in 54's parquet scan or runtime that could cause 
later, fully-parallel, CPU-bound scan tasks to be ~8× slower would help. 
Pinning this likely needs a DF53↔54 scan-level comparison or CPU-counter 
profiling.
   
   Blocks #1906.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to