Copilot commented on code in PR #22518:
URL: https://github.com/apache/datafusion/pull/22518#discussion_r3331818289
##########
datafusion/sqllogictest/test_files/information_schema.slt:
##########
@@ -416,8 +418,10 @@ datafusion.execution.parquet.writer_version 1.0 (writing)
Sets parquet writer ve
datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum
required density of join keys on the build side to consider a perfect hash join
(see `HashJoinExec` for more details). Density is calculated as: `(number of
rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual
key density > this value. Currently only supports cases where
build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX
will be added in the future.
datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect
hash join (see `HashJoinExec` for more details) will be considered if the range
of keys (max - min) on the build side is < this threshold. This provides a fast
path for joins with very small key ranges, bypassing the density check.
Currently only supports cases where build_side.num_rows() < u32::MAX. Support
for build_side.num_rows() >= u32::MAX will be added in the future.
datafusion.execution.planning_concurrency 13 Fan-out during initial physical
planning. This is mostly use to plan `UNION` children in parallel. Defaults to
the number of CPU cores on the system
+datafusion.execution.skip_partial_aggregation_ab_sampling_rows 10000 Number of
input rows used in the A/B sampling window after the initial partial probe
completes. During this window the operator routes input through the passthrough
(`transform_to_states`) path so the probe can measure `passthrough_ns/row` and
compare it against the previously measured `partial_ns/row`. Default 10000 —
large enough to amortise per-row noise, small enough to be cheap if the
decision turns out to be "keep partial".
datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8
Aggregation ratio (number of distinct groups / number of input rows) threshold
for skipping partial aggregation. If the value is greater then partial
aggregation will skip aggregation for further input
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000
Number of input rows partial aggregation partition should process, before
aggregation ratio check and trying to switch to skipping aggregation mode
+datafusion.execution.skip_partial_aggregation_use_cost_model true
(experimental) When true, apply a *secondary* skip rule on top of
`skip_partial_aggregation_probe_ratio_threshold`: skip partial aggregation when
the measured ratio is at least `skip_partial_aggregation_cost_min_ratio`
(default 0.5). Targets ClickBench Q18-shape queries where the ratio (~0.56)
sits just below the fixed 0.8 threshold so partial agg keeps running, but the
absolute work (heavy variable-length keys, complex aggregates) makes it
net-negative. Empirical motivation: lowering the global ratio threshold to 0.6
fixes Q18 (1.73× faster) but risks regressing low-cost queries at similar
ratios. This flag exposes the lower threshold as a separate, opt-in knob.
Whether the cost-aware signal (`partial_agg_probe_ns_per_row` metric) can
replace this static threshold is an open question — for now the metric is
reported alongside so callers can evaluate.
Review Comment:
This expected settings description is out of sync with the implementation:
it mentions a `skip_partial_aggregation_cost_min_ratio` setting and a
`partial_agg_probe_ns_per_row` metric that don't exist. The output should
describe the actual A/B cost model (partial vs passthrough ns/row) and the
existing ratio short-circuit.
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -851,6 +1085,62 @@ impl Stream for GroupedHashAggregateStream {
}
}
+ ExecutionState::AbSampling => {
+ // Mirror of `SkippingAggregation` — passthrough via
+ // `transform_to_states` — except that:
+ // * the partial hash table is NOT emitted (we may
+ // still revert to it),
+ // * the probe observes per-row timing via
+ // `elapsed_compute`,
+ // * after each batch we check whether the probe
+ // has finalised: skip (emit hash + switch to
+ // `SkippingAggregation`) or keep partial
+ // (return to `ReadingInput`).
+ match ready!(self.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ let _timer = elapsed_compute.timer();
+ let input_rows = batch.num_rows();
+ let states = self.transform_to_states(&batch)?;
+ if let Some(probe) =
self.skip_aggregation_probe.as_mut() {
+ probe.observe_ab_batch(input_rows);
Review Comment:
`observe_ab_batch` (and `finalize_ab_decision`) uses
`elapsed_compute.value()` for timing, but it's called while the
`elapsed_compute.timer()` guard for this batch is still live. `Time::value()`
does not include in-flight timer duration, so the passthrough ns/row
measurement can be undercounted (especially for small
`skip_partial_aggregation_ab_sampling_rows`). Stop/restart the timer guard
before calling into the probe so the metric is up to date.
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -118,88 +129,294 @@ struct SpillState {
// Metrics related to spilling are managed inside `spill_manager`
}
-/// Tracks if the aggregate should skip partial aggregations
+/// Three phases of the cost-aware skip decision.
+///
+/// 1. `Partial` — accumulate input through the hash table (normal
+/// partial-agg path), measuring `partial_ns/row` and the
+/// `num_groups/input_rows` ratio over the first
+/// `probe_rows_threshold` rows.
+/// 2. `AbSampling` — route the next `ab_sampling_rows` of input through
+/// the passthrough path (`transform_to_states`) to measure
+/// `passthrough_ns/row`. The hash table built so far is kept;
+/// nothing is emitted yet.
+/// 3. `Locked { should_skip }` — final decision. Skip when
+/// `ratio > passthrough_ns/row / partial_ns/row` (the cost-aware
+/// crossover); otherwise revert to partial agg for the rest of the
+/// stream.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum ProbePhase {
+ Partial,
+ AbSampling,
+ Locked { should_skip: bool },
+}
+
+/// Tracks if the aggregate should skip partial aggregations.
+///
+/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`].
///
-/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`]
+/// The probe runs a short A/B sampling window that measures both the
+/// partial-agg per-row cost and the passthrough per-row cost on real
+/// input, then makes a cost-based skip decision without relying on a
+/// hardcoded ratio cutoff. `use_cost_model = false` falls back to the
+/// original behaviour: a single bare ratio check at probe close.
struct SkipAggregationProbe {
// ========================================================================
- // PROPERTIES:
- // These fields are initialized at the start and remain constant throughout
- // the execution.
+ // PROPERTIES (immutable for the stream's lifetime)
// ========================================================================
- /// Aggregation ratio check performed when the number of input rows exceeds
- /// this threshold (from `SessionConfig`)
probe_rows_threshold: usize,
- /// Maximum ratio of `num_groups` to `input_rows` for continuing
aggregation
- /// (from `SessionConfig`). If the ratio exceeds this value, aggregation
- /// is skipped and input rows are directly converted to output
probe_ratio_threshold: f64,
+ use_cost_model: bool,
+ ab_sampling_rows: usize,
// ========================================================================
- // STATES:
- // Fields changes during execution. Can be buffer, or state flags that
- // influence the execution in parent `GroupedHashAggregateStream`
+ // STATE
// ========================================================================
- /// Number of processed input rows (updated during probing)
+ phase: ProbePhase,
+ /// Rows processed in the `Partial` phase.
input_rows: usize,
- /// Number of total group values for `input_rows` (updated during probing)
+ /// Latest `group_values.len()` reported in the `Partial` phase.
num_groups: usize,
-
- /// Flag indicating further data aggregation may be skipped (decision made
- /// when probing complete)
+ /// Rows processed in the `AbSampling` phase.
+ ab_rows: usize,
+ /// `elapsed_compute.value()` snapshot at probe construction.
+ elapsed_compute_at_probe_start: usize,
+ /// `elapsed_compute.value()` snapshot at the `Partial`→`AbSampling`
+ /// transition. The partial-window wall time is
+ /// `elapsed_compute_at_ab_start - elapsed_compute_at_probe_start`.
+ elapsed_compute_at_ab_start: Option<usize>,
+ /// `should_skip` and `is_locked` derived from `phase`; kept as
+ /// dedicated fields so the rest of the operator code can stay
+ /// oblivious to the phase enum.
should_skip: bool,
- /// Flag indicating further updates of `SkipAggregationProbe` state won't
- /// make any effect (set either while probing or on probing completion)
is_locked: bool,
// ========================================================================
- // METRICS:
+ // METRICS / SOURCES
// ========================================================================
- /// Number of rows where state was output without aggregation.
- ///
- /// * If 0, all input rows were aggregated (should_skip was always false)
- ///
- /// * if greater than zero, the number of rows which were output directly
- /// without aggregation
skipped_aggregation_rows: metrics::Count,
+ /// Operator-wide `elapsed_compute`; the probe reads `value()` at
+ /// phase transitions to derive per-row costs.
+ elapsed_compute: metrics::Time,
+ /// Operator metrics set + partition — saved so the diagnostic
+ /// gauges below can be lazily registered the first time the probe
+ /// has something useful to report. Queries that never reach
+ /// `probe_rows_threshold` (small inputs, short streams) won't
+ /// register them at all, so EXPLAIN ANALYZE stays clean of empty
+ /// "...=0" noise on workloads where the cost-aware path doesn't
+ /// engage.
+ agg_metrics: metrics::ExecutionPlanMetricsSet,
+ partition: usize,
+ /// Diagnostic gauges, lazily created on first set. See
+ /// [`Self::ensure_probe_gauges`] for the names and categories.
+ probe_partial_ns_per_row: Option<metrics::Gauge>,
+ probe_passthrough_ns_per_row: Option<metrics::Gauge>,
+ probe_ratio_per_mille: Option<metrics::Gauge>,
+ probe_cost_decision_skip: Option<metrics::Gauge>,
}
impl SkipAggregationProbe {
+ #[expect(clippy::too_many_arguments)]
fn new(
probe_rows_threshold: usize,
probe_ratio_threshold: f64,
+ use_cost_model: bool,
+ ab_sampling_rows: usize,
skipped_aggregation_rows: metrics::Count,
+ elapsed_compute: metrics::Time,
+ agg_metrics: metrics::ExecutionPlanMetricsSet,
+ partition: usize,
) -> Self {
+ let elapsed_compute_at_probe_start = elapsed_compute.value();
Self {
- input_rows: 0,
- num_groups: 0,
probe_rows_threshold,
probe_ratio_threshold,
+ use_cost_model,
+ ab_sampling_rows,
+ phase: ProbePhase::Partial,
+ input_rows: 0,
+ num_groups: 0,
+ ab_rows: 0,
+ elapsed_compute_at_probe_start,
+ elapsed_compute_at_ab_start: None,
should_skip: false,
is_locked: false,
skipped_aggregation_rows,
+ elapsed_compute,
+ agg_metrics,
+ partition,
+ probe_partial_ns_per_row: None,
+ probe_passthrough_ns_per_row: None,
+ probe_ratio_per_mille: None,
+ probe_cost_decision_skip: None,
}
}
- /// Updates `SkipAggregationProbe` state:
- /// - increments the number of input rows
- /// - replaces the number of groups with the new value
- /// - on `probe_rows_threshold` exceeded calculates
- /// aggregation ratio and sets `should_skip` flag
- /// - if `should_skip` is set, locks further state updates
- fn update_state(&mut self, input_rows: usize, num_groups: usize) {
- if self.is_locked {
+ /// Lazily register all four cost-aware diagnostic gauges with the
+ /// operator's metric set. Called the first time the probe has data
+ /// to report (i.e. when `finalize_partial_probe` runs). Idempotent:
+ /// once the gauges exist, this is a cheap `Option::is_some` check.
+ /// Small queries that never reach `probe_rows_threshold` skip this
+ /// entirely, so EXPLAIN ANALYZE stays free of "...=0" noise.
+ fn ensure_probe_gauges(&mut self) {
+ if self.probe_partial_ns_per_row.is_some() {
+ return;
+ }
+ self.probe_partial_ns_per_row = Some(
+ MetricBuilder::new(&self.agg_metrics)
+ .with_category(MetricCategory::Timing)
+ .gauge("partial_agg_probe_partial_ns_per_row", self.partition),
+ );
+ self.probe_passthrough_ns_per_row = Some(
+ MetricBuilder::new(&self.agg_metrics)
+ .with_category(MetricCategory::Timing)
+ .gauge("partial_agg_probe_passthrough_ns_per_row",
self.partition),
+ );
+ self.probe_ratio_per_mille = Some(
+ MetricBuilder::new(&self.agg_metrics)
+ .with_category(MetricCategory::Rows)
+ .gauge("partial_agg_probe_ratio_per_mille", self.partition),
+ );
+ self.probe_cost_decision_skip = Some(
+ MetricBuilder::new(&self.agg_metrics)
+ .with_category(MetricCategory::Rows)
+ .gauge("partial_agg_probe_cost_decision_skip", self.partition),
+ );
+ }
+
+ /// Called from the partial-agg path after each input batch. Tracks
+ /// total rows / group count and, when `probe_rows_threshold` is
+ /// reached, drives the phase transition.
+ fn observe_partial_batch(&mut self, input_rows: usize, num_groups: usize) {
+ if self.phase != ProbePhase::Partial {
return;
}
self.input_rows += input_rows;
self.num_groups = num_groups;
- if self.input_rows >= self.probe_rows_threshold {
- self.should_skip = self.num_groups as f64 / self.input_rows as f64
- >= self.probe_ratio_threshold;
- // Set is_locked to true only if we have decided to skip,
otherwise we can try to skip
- // during processing the next record_batch.
- self.is_locked = self.should_skip;
+ if self.input_rows < self.probe_rows_threshold {
+ return;
+ }
+
+ // Register the diagnostic gauges with the operator's metric set
+ // on first reach — keeps EXPLAIN ANALYZE clean on small workloads
+ // that never engage the cost-aware path.
+ self.ensure_probe_gauges();
+
+ let ratio = self.num_groups as f64 / self.input_rows as f64;
+ let partial_ns = self
+ .elapsed_compute
+ .value()
+ .saturating_sub(self.elapsed_compute_at_probe_start);
+ let partial_ns_per_row = (partial_ns as u64)
+ .checked_div(self.input_rows as u64)
+ .unwrap_or(0) as usize;
Review Comment:
`observe_partial_batch` computes `partial_ns_per_row` from
`elapsed_compute.value()`, but `elapsed_compute` is updated only when the
surrounding `ScopedTimerGuard` is stopped/dropped. If `observe_partial_batch`
is called before the batch's timer guard is stopped, `partial_ns_per_row` will
exclude the current batch's compute time while still counting its rows, biasing
the cost model (and the diagnostic gauge) low. Consider stopping/restarting the
`elapsed_compute` timer guard (or calling `observe_partial_batch` after
`timer.done()` / after the guard is dropped) so the snapshot includes the batch
that triggered probe close.
##########
datafusion/common/src/config.rs:
##########
@@ -648,6 +648,33 @@ config_namespace! {
/// aggregation ratio check and trying to switch to skipping
aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default =
100_000
+ /// (experimental) When true, apply a *secondary* skip rule on top
+ /// of `skip_partial_aggregation_probe_ratio_threshold`: skip
+ /// partial aggregation when the measured ratio is at least
+ /// `skip_partial_aggregation_cost_min_ratio` (default 0.5).
+ /// Targets ClickBench Q18-shape queries where the ratio (~0.56)
+ /// sits just below the fixed 0.8 threshold so partial agg keeps
+ /// running, but the absolute work (heavy variable-length keys,
+ /// complex aggregates) makes it net-negative.
+ ///
+ /// Empirical motivation: lowering the global ratio threshold to
+ /// 0.6 fixes Q18 (1.73× faster) but risks regressing low-cost
+ /// queries at similar ratios. This flag exposes the lower
+ /// threshold as a separate, opt-in knob. Whether the cost-aware
+ /// signal (`partial_agg_probe_ns_per_row` metric) can replace
+ /// this static threshold is an open question — for now the
+ /// metric is reported alongside so callers can evaluate.
+ pub skip_partial_aggregation_use_cost_model: bool, default = true
Review Comment:
This config option’s docstring describes a non-existent
`skip_partial_aggregation_cost_min_ratio` and `partial_agg_probe_ns_per_row`
metric. The implementation in `row_hash.rs` uses an A/B sampling window
(`skip_partial_aggregation_ab_sampling_rows`) and records separate
partial/passthrough ns/row gauges, so the docstring should be updated to match
the actual behavior.
##########
docs/source/user-guide/configs.md:
##########
@@ -132,6 +132,8 @@ The following configuration settings are available:
| datafusion.execution.keep_partition_by_columns |
false | Should DataFusion keep the columns used for
partition_by in the output RecordBatches
|
| datafusion.execution.skip_partial_aggregation_probe_ratio_threshold |
0.8 | Aggregation ratio (number of distinct groups /
number of input rows) threshold for skipping partial aggregation. If the value
is greater then partial aggregation will skip aggregation for further input
|
| datafusion.execution.skip_partial_aggregation_probe_rows_threshold |
100000 | Number of input rows partial aggregation partition
should process, before aggregation ratio check and trying to switch to skipping
aggregation mode
|
+| datafusion.execution.skip_partial_aggregation_use_cost_model |
true | (experimental) When true, apply a _secondary_ skip
rule on top of `skip_partial_aggregation_probe_ratio_threshold`: skip partial
aggregation when the measured ratio is at least
`skip_partial_aggregation_cost_min_ratio` (default 0.5). Targets ClickBench
Q18-shape queries where the ratio (~0.56) sits just below the fixed 0.8
threshold so partial agg keeps running, but the absolute work (heavy
variable-length keys, complex aggregates) makes it net-negative. Empirical
motivation: lowering the global ratio threshold to 0.6 fixes Q18 (1.73× faster)
but risks regressing low-cost queries at similar ratios. This flag exposes the
lower threshold as a separate, opt-in knob. Whether the cost-aware signal
(`partial_agg_probe_ns_per_row` metric) can replace this static threshold is an
open question — for now the metric is reported alongside so callers can
evaluate.
|
Review Comment:
The description for `skip_partial_aggregation_use_cost_model` is out of sync
with the implementation: it references a non-existent
`skip_partial_aggregation_cost_min_ratio` setting and a
`partial_agg_probe_ns_per_row` metric, but the implementation uses an A/B
sampling window (`skip_partial_aggregation_ab_sampling_rows`) and records
separate `partial_agg_probe_partial_ns_per_row` /
`partial_agg_probe_passthrough_ns_per_row` gauges.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]