This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7014a450cc feat: Extract NDV (distinct_count) statistics from Parquet
metadata (#19957)
7014a450cc is described below
commit 7014a450cc2b659660cc92192a46cf86b76d2fa9
Author: Alessandro Solimando <[email protected]>
AuthorDate: Thu Mar 19 11:23:16 2026 +0100
feat: Extract NDV (distinct_count) statistics from Parquet metadata (#19957)
## Which issue does this PR close?
- Part of #15265
Related: #18628, #8227
(I am not sure if an new issue specifically for the scope of the PR is
needed, happy to create it if needed)
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
This work originates from a discussion in datafusion-distributed about
improving the `TaskEstimator` API:
https://github.com/datafusion-contrib/datafusion-distributed/issues/296#issuecomment-3777726928
We agreed that improved statistics support in DataFusion would benefit
both projects. For distributed-datafusion, better cardinality estimation
helps decide how to split computation across network boundaries.
This also benefits DataFusion directly, as CBO is already in place, for
example, join cardinality estimation
([`joins/utils.rs:586-646`](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/utils.rs#L586-L646))
uses `distinct_count` via `max_distinct_count` to compute join
selectivity.
Currently this field is always `Absent` when reading from Parquet, so
this PR fills that gap.
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
Commit 1 - Reading NDV from Parquet files:
- Extract `distinct_count` from Parquet row group column statistics
- Single row group with NDV -> `Precision::Exact(ndv)`
- Multiple row groups with NDV -> `Precision::Inexact(max)` as
conservative lower bound
- No NDV available -> `Precision::Absent`
Commit 2 - Statistics propagation (can be split to a separate PR, if
preferred):
- `Statistics::try_merge()`: use max as conservative lower bound instead
of discarding NDV
- `Projection`: preserve NDV for single-column expressions as upper
bound
I'm including the second commit to showcase how I intend to use the
statistics, but these changes can be split to a follow-up PR to keep
review scope limited.
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Yes, 7 unit tests are added for NDV extraction:
- Single/multiple row groups with NDV
- Partial NDV availability across row groups
- Multiple columns with different NDV values
- Integration test reading a real Parquet file with distinct_count
statistics (following the pattern in
[`row_filter.rs:685-696`](https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/row_filter.rs#L685-L696),
using `parquet_to_arrow_schema` to derive the schema from the file)
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
No breaking changes. Statistics consumers will now see populated
`distinct_count` values when available in Parquet metadata.
Disclaimer: I used AI (Claude Code) to assist translating my ideas into
code as I am still ramping up with the codebase and especially with Rust
(guidance on both aspects is highly appreciated). I have a good
understanding of the core concepts (statistics, CBO etc.) and have
carefully double-checked that the PR matches my intentions and
understanding.
cc: @gabotechs @jayshrivastava @NGA-TRAN @gene-bordegaray
---
datafusion/common/src/stats.rs | 363 +++++++++++++-
.../physical_optimizer/partition_statistics.rs | 27 +-
datafusion/datasource-parquet/src/metadata.rs | 542 ++++++++++++++++++---
.../src/test_data/ndv_test.parquet | Bin 0 -> 1141 bytes
datafusion/physical-plan/src/union.rs | 96 +---
5 files changed, 858 insertions(+), 170 deletions(-)
diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index 36dab2a1ea..f263c905fa 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -665,7 +665,7 @@ impl Statistics {
max_value: cs.max_value.clone(),
min_value: cs.min_value.clone(),
sum_value: cs.sum_value.clone(),
- distinct_count: Precision::Absent,
+ distinct_count: cs.distinct_count,
byte_size: cs.byte_size,
})
.collect();
@@ -679,11 +679,24 @@ impl Statistics {
let item_cs = &stat.column_statistics[col_idx];
col_stats.null_count =
col_stats.null_count.add(&item_cs.null_count);
- col_stats.byte_size =
col_stats.byte_size.add(&item_cs.byte_size);
- col_stats.sum_value =
- precision_add(&col_stats.sum_value, &item_cs.sum_value);
+
+ // NDV must be computed before min/max update (needs pre-merge
ranges)
+ col_stats.distinct_count = match (
+ col_stats.distinct_count.get_value(),
+ item_cs.distinct_count.get_value(),
+ ) {
+ (Some(&l), Some(&r)) => Precision::Inexact(
+ estimate_ndv_with_overlap(col_stats, item_cs, l, r)
+ .unwrap_or_else(|| usize::max(l, r)),
+ ),
+ _ => Precision::Absent,
+ };
+
col_stats.min_value =
col_stats.min_value.min(&item_cs.min_value);
col_stats.max_value =
col_stats.max_value.max(&item_cs.max_value);
+ col_stats.sum_value =
+ precision_add(&col_stats.sum_value, &item_cs.sum_value);
+ col_stats.byte_size =
col_stats.byte_size.add(&item_cs.byte_size);
}
}
@@ -695,6 +708,96 @@ impl Statistics {
}
}
+/// Estimates the combined number of distinct values (NDV) when merging two
+/// column statistics, using range overlap to avoid double-counting shared
values.
+///
+/// Assumes values are distributed uniformly within each input's
+/// `[min, max]` range (the standard assumption when only summary
+/// statistics are available). Under uniformity the fraction of an input's
+/// distinct values that land in a sub-range equals the fraction of
+/// the range that sub-range covers.
+///
+/// The combined value space is split into three disjoint regions:
+///
+/// ```text
+/// |-- only A --|-- overlap --|-- only B --|
+/// ```
+///
+/// * **Only in A/B** - values outside the other input's range
+/// contribute `(1 - overlap_a) * NDV_a` and `(1 - overlap_b) * NDV_b`.
+/// * **Overlap** - both inputs may produce values here. We take
+/// `max(overlap_a * NDV_a, overlap_b * NDV_b)` rather than the
+/// sum because values in the same sub-range are likely shared
+/// (the smaller set is assumed to be a subset of the larger).
+///
+/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`,
+/// from full overlap to no overlap.
+///
+/// ```text
+/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection]
+/// + (1 - overlap_a) * NDV_a [only in A]
+/// + (1 - overlap_b) * NDV_b [only in B]
+/// ```
+///
+/// Returns `None` when min/max are absent or distance is unsupported
+/// (e.g. strings), in which case the caller should fall back to a simpler
+/// estimate.
+pub fn estimate_ndv_with_overlap(
+ left: &ColumnStatistics,
+ right: &ColumnStatistics,
+ ndv_left: usize,
+ ndv_right: usize,
+) -> Option<usize> {
+ let left_min = left.min_value.get_value()?;
+ let left_max = left.max_value.get_value()?;
+ let right_min = right.min_value.get_value()?;
+ let right_max = right.max_value.get_value()?;
+
+ let range_left = left_max.distance(left_min)?;
+ let range_right = right_max.distance(right_min)?;
+
+ // Constant columns (range == 0) can't use the proportional overlap
+ // formula below, so check interval overlap directly instead.
+ if range_left == 0 || range_right == 0 {
+ let overlaps = left_min <= right_max && right_min <= left_max;
+ return Some(if overlaps {
+ usize::max(ndv_left, ndv_right)
+ } else {
+ ndv_left + ndv_right
+ });
+ }
+
+ let overlap_min = if left_min >= right_min {
+ left_min
+ } else {
+ right_min
+ };
+ let overlap_max = if left_max <= right_max {
+ left_max
+ } else {
+ right_max
+ };
+
+ // Disjoint ranges: no overlap, NDVs are additive
+ if overlap_min > overlap_max {
+ return Some(ndv_left + ndv_right);
+ }
+
+ let overlap_range = overlap_max.distance(overlap_min)? as f64;
+
+ let overlap_left = overlap_range / range_left as f64;
+ let overlap_right = overlap_range / range_right as f64;
+
+ let intersection = f64::max(
+ overlap_left * ndv_left as f64,
+ overlap_right * ndv_right as f64,
+ );
+ let only_left = (1.0 - overlap_left) * ndv_left as f64;
+ let only_right = (1.0 - overlap_right) * ndv_right as f64;
+
+ Some((intersection + only_left + only_right).round() as usize)
+}
+
/// Creates an estimate of the number of rows in the output using the given
/// optional value and exactness flag.
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
@@ -1361,6 +1464,253 @@ mod tests {
);
}
+ #[test]
+ fn test_try_merge_distinct_count_absent() {
+ // Create statistics with known distinct counts
+ let stats1 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .with_total_byte_size(Precision::Exact(100))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+ .with_null_count(Precision::Exact(0))
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(1))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
+ .with_distinct_count(Precision::Exact(5)),
+ );
+
+ let stats2 = Statistics::default()
+ .with_num_rows(Precision::Exact(15))
+ .with_total_byte_size(Precision::Exact(150))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+ .with_null_count(Precision::Exact(0))
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(20))))
+ .with_distinct_count(Precision::Exact(7)),
+ );
+
+ // Merge statistics
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+ let merged_stats =
+ Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
+
+ // Verify the results
+ assert_eq!(merged_stats.num_rows, Precision::Exact(25));
+ assert_eq!(merged_stats.total_byte_size, Precision::Exact(250));
+
+ let col_stats = &merged_stats.column_statistics[0];
+ assert_eq!(col_stats.null_count, Precision::Exact(0));
+ assert_eq!(
+ col_stats.min_value,
+ Precision::Exact(ScalarValue::Int32(Some(1)))
+ );
+ assert_eq!(
+ col_stats.max_value,
+ Precision::Exact(ScalarValue::Int32(Some(20)))
+ );
+ // Overlap-based NDV: ranges [1,10] and [5,20], overlap [5,10]
+ // range_left=9, range_right=15, overlap=5
+ // overlap_left=5*(5/9)=2.78, overlap_right=7*(5/15)=2.33
+ // result = max(2.78, 2.33) + (5-2.78) + (7-2.33) = 9.67 -> 10
+ assert_eq!(col_stats.distinct_count, Precision::Inexact(10));
+ }
+
+ #[test]
+ fn test_try_merge_ndv_disjoint_ranges() {
+ let stats1 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
+ .with_distinct_count(Precision::Exact(5)),
+ );
+ let stats2 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(20))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(30))))
+ .with_distinct_count(Precision::Exact(8)),
+ );
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+ let merged = Statistics::try_merge_iter([&stats1, &stats2],
&schema).unwrap();
+ // No overlap -> sum of NDVs
+ assert_eq!(
+ merged.column_statistics[0].distinct_count,
+ Precision::Inexact(13)
+ );
+ }
+
+ #[test]
+ fn test_try_merge_ndv_identical_ranges() {
+ let stats1 = Statistics::default()
+ .with_num_rows(Precision::Exact(100))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
+ .with_distinct_count(Precision::Exact(50)),
+ );
+ let stats2 = Statistics::default()
+ .with_num_rows(Precision::Exact(100))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
+ .with_distinct_count(Precision::Exact(30)),
+ );
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+ let merged = Statistics::try_merge_iter([&stats1, &stats2],
&schema).unwrap();
+ // Full overlap -> max(50, 30) = 50
+ assert_eq!(
+ merged.column_statistics[0].distinct_count,
+ Precision::Inexact(50)
+ );
+ }
+
+ #[test]
+ fn test_try_merge_ndv_partial_overlap() {
+ let stats1 = Statistics::default()
+ .with_num_rows(Precision::Exact(100))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
+ .with_distinct_count(Precision::Exact(80)),
+ );
+ let stats2 = Statistics::default()
+ .with_num_rows(Precision::Exact(100))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(50))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(150))))
+ .with_distinct_count(Precision::Exact(60)),
+ );
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+ let merged = Statistics::try_merge_iter([&stats1, &stats2],
&schema).unwrap();
+ // overlap=[50,100], range_left=100, range_right=100, overlap_range=50
+ // overlap_left=80*(50/100)=40, overlap_right=60*(50/100)=30
+ // result = max(40,30) + (80-40) + (60-30) = 40 + 40 + 30 = 110
+ assert_eq!(
+ merged.column_statistics[0].distinct_count,
+ Precision::Inexact(110)
+ );
+ }
+
+ #[test]
+ fn test_try_merge_ndv_missing_min_max() {
+ let stats1 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(5)),
+ );
+ let stats2 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(8)),
+ );
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+ let merged = Statistics::try_merge_iter([&stats1, &stats2],
&schema).unwrap();
+ // No min/max -> fallback to max(5, 8)
+ assert_eq!(
+ merged.column_statistics[0].distinct_count,
+ Precision::Inexact(8)
+ );
+ }
+
+ #[test]
+ fn test_try_merge_ndv_non_numeric_types() {
+ let stats1 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+ .with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
+ "aaa".to_string(),
+ ))))
+ .with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
+ "zzz".to_string(),
+ ))))
+ .with_distinct_count(Precision::Exact(5)),
+ );
+ let stats2 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+ .with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
+ "bbb".to_string(),
+ ))))
+ .with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
+ "yyy".to_string(),
+ ))))
+ .with_distinct_count(Precision::Exact(8)),
+ );
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
+ let merged = Statistics::try_merge_iter([&stats1, &stats2],
&schema).unwrap();
+ // distance() unsupported for strings -> fallback to max
+ assert_eq!(
+ merged.column_statistics[0].distinct_count,
+ Precision::Inexact(8)
+ );
+ }
+
+ #[test]
+ fn test_try_merge_ndv_constant_columns() {
+ // Same constant: [5,5]+[5,5] -> max
+ let stats1 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
+ .with_distinct_count(Precision::Exact(1)),
+ );
+ let stats2 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
+ .with_distinct_count(Precision::Exact(1)),
+ );
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+ let merged = Statistics::try_merge_iter([&stats1, &stats2],
&schema).unwrap();
+ assert_eq!(
+ merged.column_statistics[0].distinct_count,
+ Precision::Inexact(1)
+ );
+
+ // Different constants: [5,5]+[10,10] -> sum
+ let stats3 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
+ .with_distinct_count(Precision::Exact(1)),
+ );
+ let stats4 = Statistics::default()
+ .with_num_rows(Precision::Exact(10))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(10))))
+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
+ .with_distinct_count(Precision::Exact(1)),
+ );
+
+ let merged = Statistics::try_merge_iter([&stats3, &stats4],
&schema).unwrap();
+ assert_eq!(
+ merged.column_statistics[0].distinct_count,
+ Precision::Inexact(2)
+ );
+ }
+
#[test]
fn test_with_fetch_basic_preservation() {
// Test that column statistics and byte size are preserved (as
inexact) when applying fetch
@@ -2005,8 +2355,9 @@ mod tests {
Precision::Exact(ScalarValue::Int64(Some(3500)))
);
assert_eq!(col_stats.byte_size, Precision::Exact(480));
- // distinct_count is always Absent after merge (can't accurately merge
NDV)
- assert_eq!(col_stats.distinct_count, Precision::Absent);
+ // Overlap-based NDV merge (pairwise left-to-right):
+ // stats1+stats2: [10,100]+[5,200] -> NDV=16, then +stats3:
[5,200]+[1,150] -> NDV=29
+ assert_eq!(col_stats.distinct_count, Precision::Inexact(29));
}
#[test]
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index 12ce141b47..42c1e84534 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -154,13 +154,15 @@ mod test {
// - null_count = 0 (partition values from paths are never null)
// - min/max are the merged partition values across files in the
group
// - byte_size = num_rows * 4 (Date32 is 4 bytes per row)
+ // - distinct_count = Inexact(1) per partition file (single
partition value per file),
+ // preserved via max() when merging stats across partitions
let date32_byte_size = num_rows * 4;
column_stats.push(ColumnStatistics {
null_count: Precision::Exact(0),
max_value:
Precision::Exact(ScalarValue::Date32(Some(max_date))),
min_value:
Precision::Exact(ScalarValue::Date32(Some(min_date))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(date32_byte_size),
});
}
@@ -581,7 +583,7 @@ mod test {
max_value:
Precision::Exact(ScalarValue::Date32(Some(20151))),
min_value:
Precision::Exact(ScalarValue::Date32(Some(20148))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Absent,
},
// column 2: right.id (Int32, file column from t2) - right
partition 0: ids [3,4]
@@ -615,7 +617,7 @@ mod test {
max_value:
Precision::Exact(ScalarValue::Date32(Some(20151))),
min_value:
Precision::Exact(ScalarValue::Date32(Some(20148))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Absent,
},
// column 2: right.id (Int32, file column from t2) - right
partition 1: ids [1,2]
@@ -1251,7 +1253,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(8),
},
ColumnStatistics::new_unknown(), // window column
@@ -1279,7 +1281,7 @@ mod test {
DATE_2025_03_03,
))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(8),
},
ColumnStatistics::new_unknown(), // window column
@@ -1416,6 +1418,8 @@ mod test {
byte_size: Precision::Exact(16),
},
// Left date column: all partitions (2025-03-01..2025-03-04)
+ // NDV is Inexact(1) because each Hive partition has exactly 1
distinct date value,
+ // and merging takes max as a conservative lower bound
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Date32(Some(
@@ -1425,7 +1429,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(16),
},
// Right id column: partition 0 only (id 3..4)
@@ -1438,6 +1442,7 @@ mod test {
byte_size: Precision::Exact(8),
},
// Right date column: partition 0 only (2025-03-01..2025-03-02)
+ // NDV is Inexact(1) from the single Hive partition's date
value
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Date32(Some(
@@ -1447,7 +1452,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(8),
},
],
@@ -1499,7 +1504,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(8),
},
// Right id column: partition 0 only (id 3..4)
@@ -1521,7 +1526,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(8),
},
],
@@ -1573,7 +1578,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(16),
},
// Right id column: all partitions (id 1..4)
@@ -1595,7 +1600,7 @@ mod test {
DATE_2025_03_01,
))),
sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
+ distinct_count: Precision::Inexact(1),
byte_size: Precision::Exact(16),
},
],
diff --git a/datafusion/datasource-parquet/src/metadata.rs
b/datafusion/datasource-parquet/src/metadata.rs
index 5a4c0bcdd5..e5781ad68d 100644
--- a/datafusion/datasource-parquet/src/metadata.rs
+++ b/datafusion/datasource-parquet/src/metadata.rs
@@ -52,6 +52,11 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
+/// Minimum fraction of row groups that must report NDV statistics for the
+/// merged result to be `Inexact` rather than `Absent`, as the estimate
+/// would be too unreliable otherwise.
+const PARTIAL_NDV_THRESHOLD: f64 = 0.75;
+
/// Handles fetching Parquet file schema, metadata and statistics
/// from object store.
///
@@ -297,6 +302,8 @@ impl<'a> DFParquetMetadata<'a> {
vec![Some(true); logical_file_schema.fields().len()];
let mut is_min_value_exact =
vec![Some(true); logical_file_schema.fields().len()];
+ let mut distinct_counts_array =
+ vec![Precision::Absent;
logical_file_schema.fields().len()];
logical_file_schema.fields().iter().enumerate().for_each(
|(idx, field)| match StatisticsConverter::try_new(
field.name(),
@@ -311,8 +318,9 @@ impl<'a> DFParquetMetadata<'a> {
is_min_value_exact: &mut is_min_value_exact,
is_max_value_exact: &mut is_max_value_exact,
column_byte_sizes: &mut column_byte_sizes,
+ distinct_counts_array: &mut
distinct_counts_array,
};
- summarize_min_max_null_counts(
+ summarize_column_statistics(
file_metadata.schema_descr(),
logical_file_schema,
&physical_file_schema,
@@ -330,15 +338,16 @@ impl<'a> DFParquetMetadata<'a> {
},
);
- get_col_stats(
- logical_file_schema,
- &null_counts_array,
- &mut max_accs,
- &mut min_accs,
- &mut is_max_value_exact,
- &mut is_min_value_exact,
- &column_byte_sizes,
- )
+ let mut accumulators = StatisticsAccumulators {
+ min_accs: &mut min_accs,
+ max_accs: &mut max_accs,
+ null_counts_array: &mut null_counts_array,
+ is_min_value_exact: &mut is_min_value_exact,
+ is_max_value_exact: &mut is_max_value_exact,
+ column_byte_sizes: &mut column_byte_sizes,
+ distinct_counts_array: &mut distinct_counts_array,
+ };
+ accumulators.build_column_statistics(logical_file_schema)
} else {
// Record column sizes
logical_file_schema
@@ -411,53 +420,6 @@ fn create_max_min_accs(
(max_values, min_values)
}
-fn get_col_stats(
- schema: &Schema,
- null_counts: &[Precision<usize>],
- max_values: &mut [Option<MaxAccumulator>],
- min_values: &mut [Option<MinAccumulator>],
- is_max_value_exact: &mut [Option<bool>],
- is_min_value_exact: &mut [Option<bool>],
- column_byte_sizes: &[Precision<usize>],
-) -> Vec<ColumnStatistics> {
- (0..schema.fields().len())
- .map(|i| {
- let max_value = match (
- max_values.get_mut(i).unwrap(),
- is_max_value_exact.get(i).unwrap(),
- ) {
- (Some(max_value), Some(true)) => {
- max_value.evaluate().ok().map(Precision::Exact)
- }
- (Some(max_value), Some(false)) | (Some(max_value), None) => {
- max_value.evaluate().ok().map(Precision::Inexact)
- }
- (None, _) => None,
- };
- let min_value = match (
- min_values.get_mut(i).unwrap(),
- is_min_value_exact.get(i).unwrap(),
- ) {
- (Some(min_value), Some(true)) => {
- min_value.evaluate().ok().map(Precision::Exact)
- }
- (Some(min_value), Some(false)) | (Some(min_value), None) => {
- min_value.evaluate().ok().map(Precision::Inexact)
- }
- (None, _) => None,
- };
- ColumnStatistics {
- null_count: null_counts[i],
- max_value: max_value.unwrap_or(Precision::Absent),
- min_value: min_value.unwrap_or(Precision::Absent),
- sum_value: Precision::Absent,
- distinct_count: Precision::Absent,
- byte_size: column_byte_sizes[i],
- }
- })
- .collect()
-}
-
/// Holds the accumulator state for collecting statistics from row groups
struct StatisticsAccumulators<'a> {
min_accs: &'a mut [Option<MinAccumulator>],
@@ -466,9 +428,52 @@ struct StatisticsAccumulators<'a> {
is_min_value_exact: &'a mut [Option<bool>],
is_max_value_exact: &'a mut [Option<bool>],
column_byte_sizes: &'a mut [Precision<usize>],
+ distinct_counts_array: &'a mut [Precision<usize>],
}
-fn summarize_min_max_null_counts(
+impl StatisticsAccumulators<'_> {
+ /// Converts the accumulated statistics into a vector of `ColumnStatistics`
+ fn build_column_statistics(&mut self, schema: &Schema) ->
Vec<ColumnStatistics> {
+ (0..schema.fields().len())
+ .map(|i| {
+ let max_value = match (
+ self.max_accs.get_mut(i).unwrap(),
+ self.is_max_value_exact.get(i).unwrap(),
+ ) {
+ (Some(max_value), Some(true)) => {
+ max_value.evaluate().ok().map(Precision::Exact)
+ }
+ (Some(max_value), Some(false)) | (Some(max_value), None)
=> {
+ max_value.evaluate().ok().map(Precision::Inexact)
+ }
+ (None, _) => None,
+ };
+ let min_value = match (
+ self.min_accs.get_mut(i).unwrap(),
+ self.is_min_value_exact.get(i).unwrap(),
+ ) {
+ (Some(min_value), Some(true)) => {
+ min_value.evaluate().ok().map(Precision::Exact)
+ }
+ (Some(min_value), Some(false)) | (Some(min_value), None)
=> {
+ min_value.evaluate().ok().map(Precision::Inexact)
+ }
+ (None, _) => None,
+ };
+ ColumnStatistics {
+ null_count: self.null_counts_array[i],
+ max_value: max_value.unwrap_or(Precision::Absent),
+ min_value: min_value.unwrap_or(Precision::Absent),
+ sum_value: Precision::Absent,
+ distinct_count: self.distinct_counts_array[i],
+ byte_size: self.column_byte_sizes[i],
+ }
+ })
+ .collect()
+ }
+}
+
+fn summarize_column_statistics(
parquet_schema: &SchemaDescriptor,
logical_file_schema: &Schema,
physical_file_schema: &Schema,
@@ -541,6 +546,39 @@ fn summarize_min_max_null_counts(
)
.map(|(idx, _)| idx);
+ // Extract distinct counts from row group column statistics
+ accumulators.distinct_counts_array[logical_schema_index] =
+ if let Some(parquet_idx) = parquet_index {
+ let num_row_groups = row_groups_metadata.len();
+ let distinct_counts: Vec<u64> = row_groups_metadata
+ .iter()
+ .filter_map(|rg| {
+ rg.columns()
+ .get(parquet_idx)
+ .and_then(|col| col.statistics())
+ .and_then(|stats| stats.distinct_count_opt())
+ })
+ .collect();
+
+ let coverage = distinct_counts.len() as f64 /
num_row_groups.max(1) as f64;
+
+ if coverage < PARTIAL_NDV_THRESHOLD {
+ Precision::Absent
+ } else if distinct_counts.len() == 1 && num_row_groups == 1 {
+ // Single row group with distinct count - use exact value
+ Precision::Exact(distinct_counts[0] as usize)
+ } else {
+ // Multiple row groups - use max as a lower bound estimate
+ // (can't accurately merge NDV since duplicates may exist
across row groups)
+ match distinct_counts.iter().max() {
+ Some(&max_ndv) => Precision::Inexact(max_ndv as usize),
+ None => Precision::Absent,
+ }
+ }
+ } else {
+ Precision::Absent
+ };
+
let arrow_field = logical_file_schema.field(logical_schema_index);
accumulators.column_byte_sizes[logical_schema_index] =
compute_arrow_column_size(
arrow_field.data_type(),
@@ -805,4 +843,392 @@ mod tests {
assert_eq!(result, Some(false));
}
}
+
+ mod ndv_tests {
+ use super::*;
+ use arrow::datatypes::Field;
+ use parquet::arrow::parquet_to_arrow_schema;
+ use parquet::basic::Type as PhysicalType;
+ use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+ use parquet::file::statistics::Statistics as ParquetStatistics;
+ use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
+ use std::fs::File;
+ use std::path::PathBuf;
+
+ fn create_schema_descr(num_columns: usize) -> Arc<SchemaDescriptor> {
+ let fields: Vec<Arc<SchemaType>> = (0..num_columns)
+ .map(|i| {
+ Arc::new(
+ SchemaType::primitive_type_builder(
+ &format!("col_{i}"),
+ PhysicalType::INT32,
+ )
+ .build()
+ .unwrap(),
+ )
+ })
+ .collect();
+
+ let schema = SchemaType::group_type_builder("schema")
+ .with_fields(fields)
+ .build()
+ .unwrap();
+
+ Arc::new(SchemaDescriptor::new(Arc::new(schema)))
+ }
+
+ fn create_arrow_schema(num_columns: usize) -> SchemaRef {
+ let fields: Vec<Field> = (0..num_columns)
+ .map(|i| Field::new(format!("col_{i}"), DataType::Int32, true))
+ .collect();
+ Arc::new(Schema::new(fields))
+ }
+
+ fn create_row_group_with_stats(
+ schema_descr: &Arc<SchemaDescriptor>,
+ column_stats: Vec<Option<ParquetStatistics>>,
+ num_rows: i64,
+ ) -> RowGroupMetaData {
+ let columns: Vec<ColumnChunkMetaData> = column_stats
+ .into_iter()
+ .enumerate()
+ .map(|(i, stats)| {
+ let mut builder =
+ ColumnChunkMetaData::builder(schema_descr.column(i));
+ if let Some(s) = stats {
+ builder = builder.set_statistics(s);
+ }
+ builder.set_num_values(num_rows).build().unwrap()
+ })
+ .collect();
+
+ RowGroupMetaData::builder(schema_descr.clone())
+ .set_num_rows(num_rows)
+ .set_total_byte_size(1000)
+ .set_column_metadata(columns)
+ .build()
+ .unwrap()
+ }
+
+ fn create_parquet_metadata(
+ schema_descr: Arc<SchemaDescriptor>,
+ row_groups: Vec<RowGroupMetaData>,
+ ) -> ParquetMetaData {
+ use parquet::file::metadata::FileMetaData;
+
+ let num_rows: i64 = row_groups.iter().map(|rg|
rg.num_rows()).sum();
+ let file_meta = FileMetaData::new(
+ 1, // version
+ num_rows, // num_rows
+ None, // created_by
+ None, // key_value_metadata
+ schema_descr, // schema_descr
+ None, // column_orders
+ );
+
+ ParquetMetaData::new(file_meta, row_groups)
+ }
+
+ #[test]
+ fn test_distinct_count_single_row_group_with_ndv() {
+ // Single row group with distinct count should return Exact
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Create statistics with distinct_count = 42
+ let stats = ParquetStatistics::int32(
+ Some(1), // min
+ Some(100), // max
+ Some(42), // distinct_count
+ Some(0), // null_count
+ false, // is_deprecated
+ );
+
+ let row_group =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats)],
1000);
+ let metadata = create_parquet_metadata(schema_descr,
vec![row_group]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Exact(42)
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_multiple_row_groups_with_ndv() {
+ // Multiple row groups with distinct counts should return Inexact
(sum)
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Row group 1: distinct_count = 10
+ let stats1 = ParquetStatistics::int32(
+ Some(1),
+ Some(50),
+ Some(10), // distinct_count
+ Some(0),
+ false,
+ );
+
+ // Row group 2: distinct_count = 20
+ let stats2 = ParquetStatistics::int32(
+ Some(51),
+ Some(100),
+ Some(20), // distinct_count
+ Some(0),
+ false,
+ );
+
+ let row_group1 =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats1)],
500);
+ let row_group2 =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats2)],
500);
+ let metadata =
+ create_parquet_metadata(schema_descr, vec![row_group1,
row_group2]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ // Max of distinct counts (lower bound since we can't accurately
merge NDV)
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Inexact(20)
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_no_ndv_available() {
+ // No distinct count in statistics should return Absent
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Create statistics without distinct_count (None)
+ let stats = ParquetStatistics::int32(
+ Some(1),
+ Some(100),
+ None, // no distinct_count
+ Some(0),
+ false,
+ );
+
+ let row_group =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats)],
1000);
+ let metadata = create_parquet_metadata(schema_descr,
vec![row_group]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Absent
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_partial_ndv_below_threshold() {
+ // 1 of 2 row groups has NDV (50% < 75% threshold) -> Absent
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ let stats1 =
+ ParquetStatistics::int32(Some(1), Some(50), Some(15), Some(0),
false);
+ let stats2 =
+ ParquetStatistics::int32(Some(51), Some(100), None, Some(0),
false);
+
+ let row_group1 =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats1)],
500);
+ let row_group2 =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats2)],
500);
+ let metadata =
+ create_parquet_metadata(schema_descr, vec![row_group1,
row_group2]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Absent
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_partial_ndv_above_threshold() {
+ // 3 of 4 row groups have NDV (75% >= 75% threshold) -> Inexact
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ let stats_with = |ndv| {
+ ParquetStatistics::int32(Some(1), Some(100), Some(ndv),
Some(0), false)
+ };
+ let stats_without =
+ ParquetStatistics::int32(Some(1), Some(100), None, Some(0),
false);
+
+ let rg1 = create_row_group_with_stats(
+ &schema_descr,
+ vec![Some(stats_with(10))],
+ 250,
+ );
+ let rg2 = create_row_group_with_stats(
+ &schema_descr,
+ vec![Some(stats_with(20))],
+ 250,
+ );
+ let rg3 = create_row_group_with_stats(
+ &schema_descr,
+ vec![Some(stats_with(15))],
+ 250,
+ );
+ let rg4 = create_row_group_with_stats(
+ &schema_descr,
+ vec![Some(stats_without)],
+ 250,
+ );
+ let metadata =
+ create_parquet_metadata(schema_descr, vec![rg1, rg2, rg3,
rg4]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Inexact(20)
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_multiple_columns() {
+ // Test with multiple columns, each with different NDV
+ let schema_descr = create_schema_descr(3);
+ let arrow_schema = create_arrow_schema(3);
+
+ // col_0: distinct_count = 5
+ let stats0 =
+ ParquetStatistics::int32(Some(1), Some(10), Some(5), Some(0),
false);
+ // col_1: no distinct_count
+ let stats1 =
+ ParquetStatistics::int32(Some(1), Some(100), None, Some(0),
false);
+ // col_2: distinct_count = 100
+ let stats2 =
+ ParquetStatistics::int32(Some(1), Some(1000), Some(100),
Some(0), false);
+
+ let row_group = create_row_group_with_stats(
+ &schema_descr,
+ vec![Some(stats0), Some(stats1), Some(stats2)],
+ 1000,
+ );
+ let metadata = create_parquet_metadata(schema_descr,
vec![row_group]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Exact(5)
+ );
+ assert_eq!(
+ result.column_statistics[1].distinct_count,
+ Precision::Absent
+ );
+ assert_eq!(
+ result.column_statistics[2].distinct_count,
+ Precision::Exact(100)
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_no_statistics_at_all() {
+ // No statistics in row group should return Absent for all stats
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Create row group without any statistics
+ let row_group = create_row_group_with_stats(&schema_descr,
vec![None], 1000);
+ let metadata = create_parquet_metadata(schema_descr,
vec![row_group]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Absent
+ );
+ }
+
+ /// Integration test that reads a real Parquet file with
distinct_count statistics.
+ /// The test file was created with DuckDB and has known NDV values:
+ /// - id: NULL (high cardinality, not tracked)
+ /// - category: 10 distinct values
+ /// - name: 5 distinct values
+ #[test]
+ fn test_distinct_count_from_real_parquet_file() {
+ // Path to test file created by DuckDB with distinct_count
statistics
+ let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
+ path.push("src/test_data/ndv_test.parquet");
+
+ let file = File::open(&path).expect("Failed to open test parquet
file");
+ let reader =
+ SerializedFileReader::new(file).expect("Failed to create
reader");
+ let parquet_metadata = reader.metadata();
+
+ // Derive Arrow schema from parquet file metadata
+ let arrow_schema = Arc::new(
+ parquet_to_arrow_schema(
+ parquet_metadata.file_metadata().schema_descr(),
+ None,
+ )
+ .expect("Failed to convert schema"),
+ );
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ parquet_metadata,
+ &arrow_schema,
+ )
+ .expect("Failed to extract statistics");
+
+ // id: no distinct_count (high cardinality)
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Absent,
+ "id column should have Absent distinct_count"
+ );
+
+ // category: 10 distinct values
+ assert_eq!(
+ result.column_statistics[1].distinct_count,
+ Precision::Exact(10),
+ "category column should have Exact(10) distinct_count"
+ );
+
+ // name: 5 distinct values
+ assert_eq!(
+ result.column_statistics[2].distinct_count,
+ Precision::Exact(5),
+ "name column should have Exact(5) distinct_count"
+ );
+ }
+ }
}
diff --git a/datafusion/datasource-parquet/src/test_data/ndv_test.parquet
b/datafusion/datasource-parquet/src/test_data/ndv_test.parquet
new file mode 100644
index 0000000000..3ecbe320f5
Binary files /dev/null and
b/datafusion/datasource-parquet/src/test_data/ndv_test.parquet differ
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index dafcd6ee40..db550e7f14 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -49,7 +49,7 @@ use crate::stream::ObservedStream;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
-use datafusion_common::stats::Precision;
+use datafusion_common::stats::{Precision, estimate_ndv_with_overlap};
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{
Result, assert_or_internal_err, exec_err, internal_datafusion_err,
@@ -886,100 +886,6 @@ fn union_distinct_count(
Precision::Inexact(ndv_left + ndv_right)
}
-/// Estimates the distinct count for a union using range overlap,
-/// following the approach used by Trino:
-///
-/// Assumes values are distributed uniformly within each input's
-/// `[min, max]` range (the standard assumption when only summary
-/// statistics are available, classic for scalar-based statistics
-/// propagation). Under uniformity the fraction of an input's
-/// distinct values that land in a sub-range equals the fraction of
-/// the range that sub-range covers.
-///
-/// The combined value space is split into three disjoint regions:
-///
-/// ```text
-/// |-- only A --|-- overlap --|-- only B --|
-/// ```
-///
-/// * **Only in A/B** – values outside the other input's range
-/// contribute `(1 − overlap_a) · NDV_a` and `(1 − overlap_b) · NDV_b`.
-/// * **Overlap** – both inputs may produce values here. We take
-/// `max(overlap_a · NDV_a, overlap_b · NDV_b)` rather than the
-/// sum because values in the same sub-range are likely shared
-/// (the smaller set is assumed to be a subset of the larger).
-/// This is conservative: it avoids inflating the NDV estimate,
-/// which is safer for downstream join-order decisions.
-///
-/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`,
-/// from full overlap to no overlap. Boundary cases confirm this:
-/// disjoint ranges → `NDV_a + NDV_b`, identical ranges →
-/// `max(NDV_a, NDV_b)`.
-///
-/// ```text
-/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection]
-/// + (1 - overlap_a) * NDV_a [only in A]
-/// + (1 - overlap_b) * NDV_b [only in B]
-/// ```
-fn estimate_ndv_with_overlap(
- left: &ColumnStatistics,
- right: &ColumnStatistics,
- ndv_left: usize,
- ndv_right: usize,
-) -> Option<usize> {
- let min_left = left.min_value.get_value()?;
- let max_left = left.max_value.get_value()?;
- let min_right = right.min_value.get_value()?;
- let max_right = right.max_value.get_value()?;
-
- let range_left = max_left.distance(min_left)?;
- let range_right = max_right.distance(min_right)?;
-
- // Constant columns (range == 0) can't use the proportional overlap
- // formula below, so check interval overlap directly instead.
- if range_left == 0 || range_right == 0 {
- let overlaps = min_left <= max_right && min_right <= max_left;
- return Some(if overlaps {
- usize::max(ndv_left, ndv_right)
- } else {
- ndv_left + ndv_right
- });
- }
-
- let overlap_min = if min_left >= min_right {
- min_left
- } else {
- min_right
- };
- let overlap_max = if max_left <= max_right {
- max_left
- } else {
- max_right
- };
-
- // Short-circuit: when there's no overlap the formula naturally
- // degrades to ndv_left + ndv_right (overlap_range = 0 gives
- // overlap_left = overlap_right = 0), but returning early avoids
- // the floating-point math and a fallible `distance()` call.
- if overlap_min > overlap_max {
- return Some(ndv_left + ndv_right);
- }
-
- let overlap_range = overlap_max.distance(overlap_min)? as f64;
-
- let overlap_left = overlap_range / range_left as f64;
- let overlap_right = overlap_range / range_right as f64;
-
- let intersection = f64::max(
- overlap_left * ndv_left as f64,
- overlap_right * ndv_right as f64,
- );
- let only_left = (1.0 - overlap_left) * ndv_left as f64;
- let only_right = (1.0 - overlap_right) * ndv_right as f64;
-
- Some((intersection + only_left + only_right).round() as usize)
-}
-
fn stats_union(mut left: Statistics, right: Statistics) -> Statistics {
let Statistics {
num_rows: right_num_rows,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]