hhhizzz opened a new issue, #23178:
URL: https://github.com/apache/datafusion/issues/23178

   ### Describe the bug
   
   ### Problem
   
   After #23055 hash aggregate output uses `EmitTo::First(batch_size)` to emit 
result batches incrementally.
   
   This can regress high-cardinality aggregate output. The reason is that 
`EmitTo::First(n)` currently means:
   
   1. emit the first `n` groups,
   2. remove them,
   3. shift all remaining group indexes down by `n`.
   
   That is useful when aggregation may continue after emitting. But in the 
final `AggregateHashTableState::Outputting` phase, no more `intern` calls are 
expected.
   
   Despite that, `GroupValuesColumn::emit(EmitTo::First(n))` still maintains 
lookup state for future interning. It calls `HashTable::retain` and rewrites 
remaining group indexes on every output batch.
   
   So terminal output can spend roughly:
   
   ```text
   O(groups × output_batches)
   ```
   
   maintaining lookup structures that are no longer needed.
   
   ### Reproduction
   
   One way to reproduce is with TPC-DS SF10 query 23, using the same 
Arrow/DataFusion dependency setup and the same runtime options.
   
   Compare:
   
   - parent commit: `322f6862e0744207ac24b5fedda3fb6716e654c3`
   - regressed commit: `681ba9bc7a45b5d3de31438a0505b0ce1e4854cb`
   - PR: #23055
   
   Example command shape:
   
   ```bash
   DATAFUSION_EXECUTION_TARGET_PARTITIONS=24 \
   DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
   DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS=true \
   DATAFUSION_EXECUTION_PARQUET_PRUNING=true \
   DATAFUSION_OPTIMIZER_ENABLE_DYNAMIC_FILTER_PUSHDOWN=true \
   DATAFUSION_OPTIMIZER_ENABLE_JOIN_DYNAMIC_FILTER_PUSHDOWN=true \
   DATAFUSION_OPTIMIZER_ENABLE_TOPK_DYNAMIC_FILTER_PUSHDOWN=true \
   DATAFUSION_OPTIMIZER_ENABLE_AGGREGATE_DYNAMIC_FILTER_PUSHDOWN=true \
   ./target/release/dfbench tpcds \
     --query 23 \
     --iterations 3 \
     --path /path/to/tpcds_sf10 \
     --query_path datafusion/core/tests/tpc-ds \
     --prefer_hash_join true
   ```
   
   For attribution, run once with `--debug` and inspect the `AggregateExec` 
metrics.
   
   ### Observed TPC-DS performance
   
   On SF10 q23 with 24 target partitions:
   
   ```text
   parent  322f6862e0: ~2.43s mean
   #23055  681ba9bc7a: ~8.04s mean
   ```
   
   Diagnostic run details:
   
   ```text
   parent perf iterations:
   2466.594 ms, 2414.212 ms, 2415.373 ms
   
   681ba9bc7a perf iterations:
   8091.431 ms, 7987.224 ms, 8026.777 ms
   ```
   
   The regression is isolated to aggregate output. The problematic node is a 
final partitioned aggregate:
   
   ```text
   AggregateExec: mode=FinalPartitioned,
     gby=[i_item_sk, d_date],
     aggr=[count(*)],
     output_rows ~= 18.36M,
     output_batches ~= 2.26K
   ```
   
   At the regressed commit, debug metrics showed this node spending about 32s 
summed time in `emitting_time`, while aggregation time stayed around 3s.
   
   A profile showed the hot path as:
   
   ```text
   AggregateHashTable::next_output_batch
     -> GroupValuesColumn::emit(EmitTo::First(batch_size))
        -> hashbrown::HashTable::retain
   ```
   
   ### Possible fix
   
   Can you help to take a look? I can contribute to the code @2010YOUY01
   
   Add an explicit terminal-output transition, for example:
   
   ```rust
   GroupValues::release_interning_state()
   ```
   
   called from `AggregateHashTable::start_outputting()`.
   
   For `GroupValuesColumn`, this could clear lookup-only state such as:
   
   - hash map,
   - collision lists,
   - temporary intern buffers.
   
   After that, terminal output would no longer pay to maintain lookup state 
that will not be used again.
   
   Another option is to add an output-only cursor/range API for terminal 
output, separate from `EmitTo::First`, so `EmitTo::First` can keep its current 
destructive renumbering semantics for paths that still need it.
   
   A short-term fallback is to materialize all output once and slice it when 
the estimated output size is safe, but that has higher peak-memory risk.
   
   ### To Reproduce
   
   _No response_
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to