andygrove opened a new issue, #1635:
URL: https://github.com/apache/datafusion-ballista/issues/1635

   ## Describe the bug
   
   `EXPLAIN ANALYZE` reports operator metrics that are inflated by 
approximately the stage's partition count. The actual data flow and query 
results are correct — only the displayed metrics are wrong, which makes 
performance analysis misleading.
   
   ## To Reproduce
   
   Run any TPC-H query with `EXPLAIN ANALYZE`, e.g. Q3 against SF100 with 
`--partitions 8`:
   
   ```sql
   EXPLAIN ANALYZE
   select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, 
o_orderdate, o_shippriority
   from customer, orders, lineitem
   where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = 
o_orderkey
     and o_orderdate < date '1995-03-15' and l_shipdate > date '1995-03-15'
   group by l_orderkey, o_orderdate, o_shippriority
   order by revenue desc, o_orderdate
   limit 10;
   ```
   
   Look at stage 5's lineitem `ShuffleReaderExec`:
   
   ```
   ShuffleReaderExec: partitioning: Hash([l_orderkey@0], 8), 
metrics=[output_rows=2.58 B, elapsed_compute=69.33s, ...]
   ```
   
   The lineitem shuffle (stage 4) actually wrote 323M rows. The reader cannot 
have emitted 2.58 B rows — that is `~8 × 323M`, exactly the partition count 
multiplier. Same pattern shows up on every stage: each `ShuffleReaderExec` and 
the operators above it report `partition_count ×` the true row count.
   
   I verified the actual row counts by adding a counter inside 
`CoalescedShuffleReaderStream::poll_next` that sums every batch returned to 
downstream operators. With `--partitions 8` the counter sums to ~320M 
(correct), while EXPLAIN ANALYZE reports 2.58 B (inflated by 8).
   
   The query results themselves are correct — the inflation is purely a display 
artifact of how stage metrics are aggregated.
   
   ## Expected behavior
   
   `output_rows` displayed in EXPLAIN ANALYZE should equal the actual rows the 
operator emitted, summed across that stage's tasks (so for an N-task stage 
where each task emits R rows, the metric should be N×R, not N²×R).
   
   ## Suspected location
   
   The aggregation in 
`ballista/scheduler/src/state/execution_stage.rs::combine_metrics_set` 
repeatedly pushes each new task's metric values into the existing aggregated 
`MetricsSet` and then calls `aggregate_by_name()`. I suspect the 
previously-aggregated value is being summed back in on every task update, 
producing the `partition_count ×` inflation. Other Count and Time metrics on 
operators in the same stage (e.g. `elapsed_compute`, `build_time`, `join_time`) 
appear similarly inflated.
   
   ## Additional context
   
   - Reproduced against current `main` with single-executor standalone cluster
   - Inflation factor = stage's task count, consistent across `--partitions 2 / 
4 / 8 / 16 / 32`
   - Final query results are unaffected — actual data flow through the 
operators is correct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to