andygrove opened a new issue, #1635:
URL: https://github.com/apache/datafusion-ballista/issues/1635
## Describe the bug
`EXPLAIN ANALYZE` reports operator metrics that are inflated by
approximately the stage's partition count. The actual data flow and query
results are correct — only the displayed metrics are wrong, which makes
performance analysis misleading.
## To Reproduce
Run any TPC-H query with `EXPLAIN ANALYZE`, e.g. Q3 against SF100 with
`--partitions 8`:
```sql
EXPLAIN ANALYZE
select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue,
o_orderdate, o_shippriority
from customer, orders, lineitem
where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey =
o_orderkey
and o_orderdate < date '1995-03-15' and l_shipdate > date '1995-03-15'
group by l_orderkey, o_orderdate, o_shippriority
order by revenue desc, o_orderdate
limit 10;
```
Look at stage 5's lineitem `ShuffleReaderExec`:
```
ShuffleReaderExec: partitioning: Hash([l_orderkey@0], 8),
metrics=[output_rows=2.58 B, elapsed_compute=69.33s, ...]
```
The lineitem shuffle (stage 4) actually wrote 323M rows. The reader cannot
have emitted 2.58 B rows — that is `~8 × 323M`, exactly the partition count
multiplier. Same pattern shows up on every stage: each `ShuffleReaderExec` and
the operators above it report `partition_count ×` the true row count.
I verified the actual row counts by adding a counter inside
`CoalescedShuffleReaderStream::poll_next` that sums every batch returned to
downstream operators. With `--partitions 8` the counter sums to ~320M
(correct), while EXPLAIN ANALYZE reports 2.58 B (inflated by 8).
The query results themselves are correct — the inflation is purely a display
artifact of how stage metrics are aggregated.
## Expected behavior
`output_rows` displayed in EXPLAIN ANALYZE should equal the actual rows the
operator emitted, summed across that stage's tasks (so for an N-task stage
where each task emits R rows, the metric should be N×R, not N²×R).
## Suspected location
The aggregation in
`ballista/scheduler/src/state/execution_stage.rs::combine_metrics_set`
repeatedly pushes each new task's metric values into the existing aggregated
`MetricsSet` and then calls `aggregate_by_name()`. I suspect the
previously-aggregated value is being summed back in on every task update,
producing the `partition_count ×` inflation. Other Count and Time metrics on
operators in the same stage (e.g. `elapsed_compute`, `build_time`, `join_time`)
appear similarly inflated.
## Additional context
- Reproduced against current `main` with single-executor standalone cluster
- Inflation factor = stage's task count, consistent across `--partitions 2 /
4 / 8 / 16 / 32`
- Final query results are unaffected — actual data flow through the
operators is correct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]