kosiew commented on code in PR #22752:
URL: https://github.com/apache/datafusion/pull/22752#discussion_r3361085910
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -195,7 +195,7 @@ impl SkipAggregationProbe {
self.num_groups = num_groups;
if self.input_rows >= self.probe_rows_threshold {
self.should_skip = self.num_groups as f64 / self.input_rows as f64
- >= self.probe_ratio_threshold;
+ > self.probe_ratio_threshold;
Review Comment:
Nice fix. One small thought: the `>` boundary is the key invariant here, but
the new end-to-end regression uses `threshold=1.0`, which now disables probe
construction before this comparison runs.
It might be worth adding a focused unit or regression case where
`probe_ratio_threshold < 1.0` and `num_groups / input_rows == threshold`, so we
explicitly pin equality as "do not skip" in
`SkipAggregationProbe::update_state` itself.
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -3791,6 +3791,92 @@ mod tests {
Ok(())
}
+ /// When `skip_partial_aggregation_probe_ratio_threshold` is set to 1.0,
+ /// the feature must be effectively disabled: even with 100% cardinality
+ /// (every row is a unique group), no rows should be skipped.
+ #[tokio::test]
+ async fn test_skip_aggregation_disabled_at_threshold_one() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("key", DataType::Int32, true),
+ Field::new("val", DataType::Int32, true),
+ ]));
+
+ let group_by =
+ PhysicalGroupBy::new_single(vec![(col("key", &schema)?,
"key".to_string())]);
+
+ let aggr_expr = vec![
+ AggregateExprBuilder::new(count_udaf(), vec![col("val", &schema)?])
+ .schema(Arc::clone(&schema))
+ .alias(String::from("COUNT(val)"))
+ .build()
+ .map(Arc::new)?,
+ ];
+
+ // Two batches are required: batch 1 triggers the probe threshold so
the
+ // skip decision is evaluated; batch 2 is what would be skipped on main
+ // (where >= caused threshold=1.0 to still skip at 100% cardinality).
+ // All rows have unique keys => ratio = 1.0 (100% cardinality).
+ let input_data = vec![
+ // Batch 1: fires the probe check (ratio = 5/5 = 1.0)
+ RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
+ Arc::new(Int32Array::from(vec![0, 0, 0, 0, 0])),
+ ],
+ )
+ .unwrap(),
+ // Batch 2: would be skipped if threshold=1.0 did not disable the
feature
+ RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(vec![6, 7, 8, 9, 10])),
+ Arc::new(Int32Array::from(vec![0, 0, 0, 0, 0])),
+ ],
+ )
+ .unwrap(),
+ ];
+
+ let input =
+ TestMemoryExec::try_new_exec(&[input_data], Arc::clone(&schema),
None)?;
+ let aggregate_exec = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by,
+ aggr_expr,
+ vec![None],
+ Arc::clone(&input) as Arc<dyn ExecutionPlan>,
+ schema,
+ )?);
+
+ let mut session_config = SessionConfig::default();
Review Comment:
Tiny cleanup suggestion: this builder-style config setup can be collapsed a
bit without changing behavior:
```rust
let session_config = SessionConfig::default()
.set(
"datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
&ScalarValue::Int64(Some(1)),
)
.set(
"datafusion.execution.skip_partial_aggregation_probe_ratio_threshold",
&ScalarValue::Float64(Some(1.0)),
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]