Dandandan opened a new pull request, #21856:
URL: https://github.com/apache/datafusion/pull/21856

   ## Which issue does this PR close?
   
   - Closes #.
   
   ## Rationale for this change
   
   Apache DataFusion's grouped hash aggregate today scales beyond a thread's L3 
cache by letting the in-memory hash table grow unbounded, then spilling to disk 
on memory-pool exhaustion. Between "fits in L3" and "memory pool exhausted" 
there's a wide regime where the hash table thrashes cache (one near-miss per 
row), with no defense.
   
   This PR adds a cache-efficient fallback inspired by Müller et al., 
*Cache-Efficient Aggregation: Hashing Is Sorting* (SIGMOD 2015): when the 
working set outgrows a thread's share of last-level cache, the operator 
radix-partitions the in-memory partial-aggregate state into a fixed number of 
bucketed runs, and after the input is drained re-aggregates each bucket 
independently with a fresh, cache-resident hash table. Each bucket holds ~`K / 
32` groups, so it stays small.
   
   The trigger is the **working-set size** (sized to L3), not memory-pool 
exhaustion — by the time the pool is full the hash table has been thrashing 
cache for a while.
   
   ## What changes are included in this PR?
   
   - New `OutOfMemoryMode::RadixPartition` for non-Partial modes with 
`GroupOrdering::None`.
   - New `RadixPartitionState` holding `Vec<Vec<RecordBatch>>` indexed by 5-bit 
hash bucket (`NUM_RADIX_PARTITIONS = 32`).
   - Proactive cache-size trigger in the `ReadingInput` poll path: after each 
ingested batch, if `group_values.size() + accumulators.size() > threshold`, 
flush the hash table into bucketed runs and continue.
   - Bucket-by-bucket drain phase reuses the existing `is_stream_merging` 
machinery (each bucket's runs become a `BucketStream` that flows back through 
`merge_batch`).
   - Fast-path bypass: if no radix flush ever fired during ingestion, fall 
through to the normal emit path so output ordering and tiny-query latency are 
unchanged.
   - Two new config options:
     - `datafusion.execution.aggregate_radix_partitioned` (bool, default 
**true**)
     - `datafusion.execution.aggregate_radix_partitioned_threshold_bytes` 
(default 32 MiB)
   
   ### Known limitations (draft)
   
   - **Disk-spill fallback for an oversized single bucket is not implemented.** 
If a single bucket itself exceeds the memory budget during drain, the operator 
surfaces an error rather than recursively partitioning or falling back to disk. 
Recursive partitioning (paper's approach, with progressively higher hash bits) 
is the right fix; this draft chose to defer it. Tests that explicitly assert 
the disk-spill path now disable the flag.
   - **The 32 MiB default threshold is too low** for many ClickBench queries — 
see benchmark numbers below.
   
   ## Are these changes tested?
   
   Three new unit tests in `datafusion-physical-plan` 
(`test_radix_partitioned_high_cardinality`, `_low_cardinality`, 
`_single_group`) drive a `Single`-mode aggregate with the cache-size threshold 
pinned to 64 bytes (so the radix flush fires repeatedly) and assert the output 
multiset matches the non-radix path on the same input.
   
   The pre-existing spill-specific tests 
(`test_aggregate_with_spill_if_necessary`, 
`test_sort_reservation_fails_during_spill`, `aggregate_source_*_with_spill`) 
explicitly disable the flag in their `TaskContext` so they continue to exercise 
the disk-spill code path.
   
   All 89 `aggregates::` tests pass.
   
   ## Are there any user-facing changes?
   
   The two new config keys appear in `information_schema.df_settings` and 
`docs/source/user-guide/configs.md`. No public Rust API changes.
   
   ## Preliminary ClickBench numbers (single `hits.parquet`, 3 iterations)
   
   | Metric | Radix off | Radix on (default) |
   | --- | --- | --- |
   | Total time | 37.95 s | 37.48 s (**~1.2% faster**) |
   | Faster | — | 6 queries |
   | Slower | — | 16 queries |
   | No change | — | 21 queries |
   
   Notable wins:
   - Q19: **1.50x** faster (65 → 44 ms)
   - Q32: **1.40x** faster (4.29 s → 3.06 s)
   - Q18: 1.16x faster (3.15 s → 2.71 s)
   
   Notable regressions:
   - Q31: 1.33x slower (458 → 610 ms)
   - Q13: 1.29x slower (`GROUP BY URL`, ~10M distinct)
   - Q8: 1.25x slower
   - Q4–Q9, Q14–Q16: all 1.13–1.25x slower
   
   The regression pattern is consistent with a too-low threshold: queries whose 
per-thread hash table sits just above 32 MiB get a forced extra round trip 
through bucket drain even though the data fit fine in L3. Threshold tuning + a 
reduction-factor gate (only flush if recent batches show poor aggregation) are 
the obvious next steps before this is ready for merge.
   
   ## Follow-ups
   
   - Tune the default threshold (try 128 MiB / 256 MiB; likely needs to be 
cpu-count-aware).
   - Add a reduction-factor gate analogous to `SkipAggregationProbe` so we 
don't flush a hash table that is aggregating well.
   - Implement recursive partitioning with higher-order hash bits for oversized 
buckets, with disk-spill as the final fallback.
   - Run with `--iterations 5+` and on the partitioned dataset for less 
variance.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to