This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 26251bb37a Use exact distinct_count from statistics if exists for 
`COUNT(DISTINCT column))` calculations (#20845)
26251bb37a is described below

commit 26251bb37add19b5034fef48c9a122d73ce84d50
Author: Burak Şen <[email protected]>
AuthorDate: Mon Mar 16 23:46:00 2026 +0300

    Use exact distinct_count from statistics if exists for `COUNT(DISTINCT 
column))` calculations (#20845)
    
    ## Which issue does this PR close?
    Does not close but part of #20766
    
    ## Rationale for this change
    If `distinct_count` is known for the column from statistics we can
    directly use it in `COUNT(DISTINCT column))` expressions instead of
    calculating.
    
    ## What changes are included in this PR?
    - Return distinct_count value if exists from statistics
    - Unit tests
    
    ## Are these changes tested?
    Added additional tests
    
    
    ## Are there any user-facing changes?
    No
---
 .../physical_optimizer/aggregate_statistics.rs     | 153 ++++++++++++++++++++-
 datafusion/functions-aggregate/src/count.rs        |  49 ++++---
 2 files changed, 181 insertions(+), 21 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs 
b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
index 4218f76fa1..850f9d1877 100644
--- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
@@ -23,16 +23,23 @@ use arrow::array::Int32Array;
 use arrow::array::{Int64Array, StringArray};
 use arrow::datatypes::{DataType, Field, Schema};
 use arrow::record_batch::RecordBatch;
+use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::memory::MemTable;
 use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::physical_plan::ParquetSource;
 use datafusion::datasource::source::DataSourceExec;
 use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion_common::Result;
 use datafusion_common::assert_batches_eq;
 use datafusion_common::cast::as_int64_array;
 use datafusion_common::config::ConfigOptions;
+use datafusion_common::stats::Precision;
+use datafusion_common::{ColumnStatistics, Result, Statistics};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
 use datafusion_execution::TaskContext;
+use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_expr::Operator;
+use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_physical_expr::aggregate::AggregateExprBuilder;
 use datafusion_physical_expr::expressions::{self, cast};
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
@@ -402,3 +409,147 @@ async fn utf8_grouping_min_max_limit_fallbacks() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_count_distinct_optimization() -> Result<()> {
+    struct TestCase {
+        name: &'static str,
+        distinct_count: Precision<usize>,
+        use_column_expr: bool,
+        expect_optimized: bool,
+        expected_value: Option<i64>,
+    }
+
+    let cases = vec![
+        TestCase {
+            name: "exact statistics",
+            distinct_count: Precision::Exact(42),
+            use_column_expr: true,
+            expect_optimized: true,
+            expected_value: Some(42),
+        },
+        TestCase {
+            name: "absent statistics",
+            distinct_count: Precision::Absent,
+            use_column_expr: true,
+            expect_optimized: false,
+            expected_value: None,
+        },
+        TestCase {
+            name: "inexact statistics",
+            distinct_count: Precision::Inexact(42),
+            use_column_expr: true,
+            expect_optimized: false,
+            expected_value: None,
+        },
+        TestCase {
+            name: "non-column expression with exact statistics",
+            distinct_count: Precision::Exact(42),
+            use_column_expr: false,
+            expect_optimized: false,
+            expected_value: None,
+        },
+    ];
+
+    for case in cases {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+        ]));
+
+        let statistics = Statistics {
+            num_rows: Precision::Exact(100),
+            total_byte_size: Precision::Absent,
+            column_statistics: vec![
+                ColumnStatistics {
+                    distinct_count: case.distinct_count,
+                    null_count: Precision::Exact(10),
+                    ..Default::default()
+                },
+                ColumnStatistics::default(),
+            ],
+        };
+
+        let config = FileScanConfigBuilder::new(
+            ObjectStoreUrl::parse("test:///").unwrap(),
+            Arc::new(ParquetSource::new(Arc::clone(&schema))),
+        )
+        .with_file(PartitionedFile::new("x".to_string(), 100))
+        .with_statistics(statistics)
+        .build();
+
+        let source: Arc<dyn ExecutionPlan> = 
DataSourceExec::from_data_source(config);
+        let schema = source.schema();
+
+        let (agg_args, alias): (Vec<Arc<dyn 
datafusion_physical_expr::PhysicalExpr>>, _) =
+            if case.use_column_expr {
+                (vec![expressions::col("a", &schema)?], "COUNT(DISTINCT a)")
+            } else {
+                (
+                    vec![expressions::binary(
+                        expressions::col("a", &schema)?,
+                        Operator::Plus,
+                        expressions::col("b", &schema)?,
+                        &schema,
+                    )?],
+                    "COUNT(DISTINCT a + b)",
+                )
+            };
+
+        let count_distinct_expr = AggregateExprBuilder::new(count_udaf(), 
agg_args)
+            .schema(Arc::clone(&schema))
+            .alias(alias)
+            .distinct()
+            .build()?;
+
+        let partial_agg = AggregateExec::try_new(
+            AggregateMode::Partial,
+            PhysicalGroupBy::default(),
+            vec![Arc::new(count_distinct_expr.clone())],
+            vec![None],
+            source,
+            Arc::clone(&schema),
+        )?;
+
+        let final_agg = AggregateExec::try_new(
+            AggregateMode::Final,
+            PhysicalGroupBy::default(),
+            vec![Arc::new(count_distinct_expr)],
+            vec![None],
+            Arc::new(partial_agg),
+            Arc::clone(&schema),
+        )?;
+
+        let conf = ConfigOptions::new();
+        let optimized =
+            AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
+
+        if case.expect_optimized {
+            assert!(
+                optimized.as_any().is::<ProjectionExec>(),
+                "'{}': expected ProjectionExec",
+                case.name
+            );
+
+            if let Some(expected_val) = case.expected_value {
+                let task_ctx = Arc::new(TaskContext::default());
+                let result = common::collect(optimized.execute(0, 
task_ctx)?).await?;
+                assert_eq!(result.len(), 1, "'{}': expected 1 batch", 
case.name);
+                assert_eq!(
+                    as_int64_array(result[0].column(0)).unwrap().values(),
+                    &[expected_val],
+                    "'{}': unexpected value",
+                    case.name
+                );
+            }
+        } else {
+            assert!(
+                optimized.as_any().is::<AggregateExec>(),
+                "'{}': expected AggregateExec (not optimized)",
+                case.name
+            );
+        }
+    }
+
+    Ok(())
+}
diff --git a/datafusion/functions-aggregate/src/count.rs 
b/datafusion/functions-aggregate/src/count.rs
index 376cf39745..ebe3c60a4d 100644
--- a/datafusion/functions-aggregate/src/count.rs
+++ b/datafusion/functions-aggregate/src/count.rs
@@ -365,31 +365,40 @@ impl AggregateUDFImpl for Count {
     }
 
     fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> 
Option<ScalarValue> {
+        let [expr] = statistics_args.exprs else {
+            return None;
+        };
+        let col_stats = &statistics_args.statistics.column_statistics;
+
         if statistics_args.is_distinct {
+            // Only column references can be resolved from statistics;
+            // expressions like casts or literals are not supported.
+            let col_expr = 
expr.as_any().downcast_ref::<expressions::Column>()?;
+            if let Precision::Exact(dc) = 
col_stats[col_expr.index()].distinct_count {
+                let dc = i64::try_from(dc).ok()?;
+                return Some(ScalarValue::Int64(Some(dc)));
+            }
             return None;
         }
-        if let Precision::Exact(num_rows) = statistics_args.statistics.num_rows
-            && statistics_args.exprs.len() == 1
-        {
-            // TODO optimize with exprs other than Column
-            if let Some(col_expr) = statistics_args.exprs[0]
-                .as_any()
-                .downcast_ref::<expressions::Column>()
-            {
-                let current_val = &statistics_args.statistics.column_statistics
-                    [col_expr.index()]
-                .null_count;
-                if let &Precision::Exact(val) = current_val {
-                    return Some(ScalarValue::Int64(Some((num_rows - val) as 
i64)));
-                }
-            } else if let Some(lit_expr) = statistics_args.exprs[0]
-                .as_any()
-                .downcast_ref::<expressions::Literal>()
-                && lit_expr.value() == &COUNT_STAR_EXPANSION
-            {
-                return Some(ScalarValue::Int64(Some(num_rows as i64)));
+
+        let Precision::Exact(num_rows) = statistics_args.statistics.num_rows 
else {
+            return None;
+        };
+
+        // TODO optimize with exprs other than Column
+        if let Some(col_expr) = 
expr.as_any().downcast_ref::<expressions::Column>() {
+            if let Precision::Exact(val) = 
col_stats[col_expr.index()].null_count {
+                let count = i64::try_from(num_rows - val).ok()?;
+                return Some(ScalarValue::Int64(Some(count)));
             }
+        } else if let Some(lit_expr) =
+            expr.as_any().downcast_ref::<expressions::Literal>()
+            && lit_expr.value() == &COUNT_STAR_EXPANSION
+        {
+            let num_rows = i64::try_from(num_rows).ok()?;
+            return Some(ScalarValue::Int64(Some(num_rows)));
         }
+
         None
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to