This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new a5f6fbb4cd [branch-52] fix: interval analysis error when have two
filterexec that inner filter proves zero selectivity (#20743) (#20880)
a5f6fbb4cd is described below
commit a5f6fbb4cd89a47e1036986abe201def15542093
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 06:57:13 2026 -0400
[branch-52] fix: interval analysis error when have two filterexec that
inner filter proves zero selectivity (#20743) (#20880)
- Part of https://github.com/apache/datafusion/issues/20855
- Closes https://github.com/apache/datafusion/issues/20742 on branch-52
This PR:
- Backports https://github.com/apache/datafusion/pull/20743 from
@haohuaijin to the branch-52 line
Co-authored-by: Huaijin <[email protected]>
---
.../physical_optimizer/partition_statistics.rs | 24 +++---
datafusion/physical-plan/src/filter.rs | 91 +++++++++++++++++++---
2 files changed, 93 insertions(+), 22 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index 36fa95aa9f..00827edc24 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -387,17 +387,17 @@ mod test {
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
- max_value: Precision::Exact(ScalarValue::Null),
- min_value: Precision::Exact(ScalarValue::Null),
- sum_value: Precision::Exact(ScalarValue::Null),
+ max_value: Precision::Exact(ScalarValue::Int32(None)),
+ min_value: Precision::Exact(ScalarValue::Int32(None)),
+ sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
byte_size: Precision::Exact(16),
},
ColumnStatistics {
null_count: Precision::Exact(0),
- max_value: Precision::Exact(ScalarValue::Null),
- min_value: Precision::Exact(ScalarValue::Null),
- sum_value: Precision::Exact(ScalarValue::Null),
+ max_value: Precision::Exact(ScalarValue::Date32(None)),
+ min_value: Precision::Exact(ScalarValue::Date32(None)),
+ sum_value: Precision::Exact(ScalarValue::Date32(None)),
distinct_count: Precision::Exact(0),
byte_size: Precision::Exact(16), // 4 rows * 4 bytes
(Date32)
},
@@ -416,17 +416,17 @@ mod test {
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
- max_value: Precision::Exact(ScalarValue::Null),
- min_value: Precision::Exact(ScalarValue::Null),
- sum_value: Precision::Exact(ScalarValue::Null),
+ max_value: Precision::Exact(ScalarValue::Int32(None)),
+ min_value: Precision::Exact(ScalarValue::Int32(None)),
+ sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
byte_size: Precision::Exact(8),
},
ColumnStatistics {
null_count: Precision::Exact(0),
- max_value: Precision::Exact(ScalarValue::Null),
- min_value: Precision::Exact(ScalarValue::Null),
- sum_value: Precision::Exact(ScalarValue::Null),
+ max_value: Precision::Exact(ScalarValue::Date32(None)),
+ min_value: Precision::Exact(ScalarValue::Date32(None)),
+ sum_value: Precision::Exact(ScalarValue::Date32(None)),
distinct_count: Precision::Exact(0),
byte_size: Precision::Exact(8), // 2 rows * 4 bytes
(Date32)
},
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index f6ed0c91ec..b96b99933c 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -232,6 +232,7 @@ impl FilterExec {
let total_byte_size =
total_byte_size.with_estimated_selectivity(selectivity);
let column_statistics = collect_new_statistics(
+ schema,
&input_stats.column_statistics,
analysis_ctx.boundaries,
);
@@ -637,6 +638,7 @@ impl EmbeddedProjection for FilterExec {
/// is adjusted by using the next/previous value for its data type to convert
/// it into a closed bound.
fn collect_new_statistics(
+ schema: &SchemaRef,
input_column_stats: &[ColumnStatistics],
analysis_boundaries: Vec<ExprBoundaries>,
) -> Vec<ColumnStatistics> {
@@ -653,12 +655,17 @@ fn collect_new_statistics(
},
)| {
let Some(interval) = interval else {
- // If the interval is `None`, we can say that there are no
rows:
+ // If the interval is `None`, we can say that there are no
rows.
+ // Use a typed null to preserve the column's data type, so
that
+ // downstream interval analysis can still intersect
intervals
+ // of the same type.
+ let typed_null =
ScalarValue::try_from(schema.field(idx).data_type())
+ .unwrap_or(ScalarValue::Null);
return ColumnStatistics {
null_count: Precision::Exact(0),
- max_value: Precision::Exact(ScalarValue::Null),
- min_value: Precision::Exact(ScalarValue::Null),
- sum_value: Precision::Exact(ScalarValue::Null),
+ max_value: Precision::Exact(typed_null.clone()),
+ min_value: Precision::Exact(typed_null.clone()),
+ sum_value: Precision::Exact(typed_null),
distinct_count: Precision::Exact(0),
byte_size: input_column_stats[idx].byte_size,
};
@@ -1351,17 +1358,17 @@ mod tests {
statistics.column_statistics,
vec![
ColumnStatistics {
- min_value: Precision::Exact(ScalarValue::Null),
- max_value: Precision::Exact(ScalarValue::Null),
- sum_value: Precision::Exact(ScalarValue::Null),
+ min_value: Precision::Exact(ScalarValue::Int32(None)),
+ max_value: Precision::Exact(ScalarValue::Int32(None)),
+ sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
null_count: Precision::Exact(0),
byte_size: Precision::Absent,
},
ColumnStatistics {
- min_value: Precision::Exact(ScalarValue::Null),
- max_value: Precision::Exact(ScalarValue::Null),
- sum_value: Precision::Exact(ScalarValue::Null),
+ min_value: Precision::Exact(ScalarValue::Int32(None)),
+ max_value: Precision::Exact(ScalarValue::Int32(None)),
+ sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
null_count: Precision::Exact(0),
byte_size: Precision::Absent,
@@ -1372,6 +1379,70 @@ mod tests {
Ok(())
}
+ /// Regression test: stacking two FilterExecs where the inner filter
+ /// proves zero selectivity should not panic with a type mismatch
+ /// during interval intersection.
+ ///
+ /// Previously, when a filter proved no rows could match, the column
+ /// statistics used untyped `ScalarValue::Null` (data type `Null`).
+ /// If an outer FilterExec then tried to analyze its own predicate
+ /// against those statistics, `Interval::intersect` would fail with:
+ /// "Only intervals with the same data type are intersectable, lhs:Null,
rhs:Int32"
+ #[tokio::test]
+ async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
+ // Inner table: a: [1, 100], b: [1, 3]
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]);
+ let input = Arc::new(StatisticsExec::new(
+ Statistics {
+ num_rows: Precision::Inexact(1000),
+ total_byte_size: Precision::Inexact(4000),
+ column_statistics: vec![
+ ColumnStatistics {
+ min_value:
Precision::Inexact(ScalarValue::Int32(Some(1))),
+ max_value:
Precision::Inexact(ScalarValue::Int32(Some(100))),
+ ..Default::default()
+ },
+ ColumnStatistics {
+ min_value:
Precision::Inexact(ScalarValue::Int32(Some(1))),
+ max_value:
Precision::Inexact(ScalarValue::Int32(Some(3))),
+ ..Default::default()
+ },
+ ],
+ },
+ schema,
+ ));
+
+ // Inner filter: a > 200 (impossible given a max=100 → zero
selectivity)
+ let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Gt,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
+ ));
+ let inner_filter: Arc<dyn ExecutionPlan> =
+ Arc::new(FilterExec::try_new(inner_predicate, input)?);
+
+ // Outer filter: a = 50
+ // Before the fix, this would panic because the inner filter's
+ // zero-selectivity statistics produced Null-typed intervals for
+ // column `a`, which couldn't intersect with the Int32 literal.
+ let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Eq,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
+ ));
+ let outer_filter: Arc<dyn ExecutionPlan> =
+ Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
+
+ // Should succeed without error
+ let statistics = outer_filter.partition_statistics(None)?;
+ assert_eq!(statistics.num_rows, Precision::Inexact(0));
+
+ Ok(())
+ }
+
#[tokio::test]
async fn test_filter_statistics_more_inputs() -> Result<()> {
let schema = Schema::new(vec![
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]