asolimando commented on code in PR #21430:
URL: https://github.com/apache/datafusion/pull/21430#discussion_r3123869604
##########
datafusion/physical-plan/src/union.rs:
##########
@@ -959,294 +903,113 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_stats_union() {
- let left = Statistics {
- num_rows: Precision::Exact(5),
- total_byte_size: Precision::Exact(23),
- column_statistics: vec![
- ColumnStatistics {
- distinct_count: Precision::Exact(5),
- max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
- min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
- sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
- null_count: Precision::Exact(0),
- byte_size: Precision::Absent,
- },
- ColumnStatistics {
- distinct_count: Precision::Exact(1),
- max_value: Precision::Exact(ScalarValue::from("x")),
- min_value: Precision::Exact(ScalarValue::from("a")),
- sum_value: Precision::Absent,
- null_count: Precision::Exact(3),
- byte_size: Precision::Absent,
- },
- ColumnStatistics {
- distinct_count: Precision::Absent,
- max_value:
Precision::Exact(ScalarValue::Float32(Some(1.1))),
- min_value:
Precision::Exact(ScalarValue::Float32(Some(0.1))),
- sum_value:
Precision::Exact(ScalarValue::Float32(Some(42.0))),
- null_count: Precision::Absent,
- byte_size: Precision::Absent,
- },
- ],
- };
+ fn stats_merge_inputs() -> (SchemaRef, Statistics, Statistics, Statistics)
{
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::UInt32, true)]));
+
+ let left = Statistics::default()
+ .with_num_rows(Precision::Exact(5))
+ .with_total_byte_size(Precision::Exact(23))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+ .with_distinct_count(Precision::Exact(5))
+
.with_min_value(Precision::Exact(ScalarValue::UInt32(Some(1))))
+
.with_max_value(Precision::Exact(ScalarValue::UInt32(Some(21))))
+
.with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(42))))
+ .with_null_count(Precision::Exact(0))
+ .with_byte_size(Precision::Exact(40)),
+ );
- let right = Statistics {
- num_rows: Precision::Exact(7),
- total_byte_size: Precision::Exact(29),
- column_statistics: vec![
- ColumnStatistics {
- distinct_count: Precision::Exact(3),
- max_value: Precision::Exact(ScalarValue::Int64(Some(34))),
- min_value: Precision::Exact(ScalarValue::Int64(Some(1))),
- sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
- null_count: Precision::Exact(1),
- byte_size: Precision::Absent,
- },
- ColumnStatistics {
- distinct_count: Precision::Absent,
- max_value: Precision::Exact(ScalarValue::from("c")),
- min_value: Precision::Exact(ScalarValue::from("b")),
- sum_value: Precision::Absent,
- null_count: Precision::Absent,
- byte_size: Precision::Absent,
- },
- ColumnStatistics {
- distinct_count: Precision::Absent,
- max_value: Precision::Absent,
- min_value: Precision::Absent,
- sum_value: Precision::Absent,
- null_count: Precision::Absent,
- byte_size: Precision::Absent,
- },
- ],
- };
+ let right = Statistics::default()
+ .with_num_rows(Precision::Exact(7))
+ .with_total_byte_size(Precision::Exact(29))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+ .with_distinct_count(Precision::Exact(3))
+
.with_min_value(Precision::Exact(ScalarValue::UInt32(Some(22))))
+
.with_max_value(Precision::Exact(ScalarValue::UInt32(Some(34))))
+
.with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(8))))
+ .with_null_count(Precision::Exact(1))
+ .with_byte_size(Precision::Exact(60)),
+ );
- let result = stats_union(left, right);
- let expected = Statistics {
- num_rows: Precision::Exact(12),
- total_byte_size: Precision::Exact(52),
- column_statistics: vec![
- ColumnStatistics {
- distinct_count: Precision::Inexact(6),
- max_value: Precision::Exact(ScalarValue::Int64(Some(34))),
- min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
- sum_value: Precision::Exact(ScalarValue::Int64(Some(84))),
- null_count: Precision::Exact(1),
- byte_size: Precision::Absent,
- },
- ColumnStatistics {
- distinct_count: Precision::Absent,
- max_value: Precision::Exact(ScalarValue::from("x")),
- min_value: Precision::Exact(ScalarValue::from("a")),
- sum_value: Precision::Absent,
- null_count: Precision::Absent,
- byte_size: Precision::Absent,
- },
- ColumnStatistics {
- distinct_count: Precision::Absent,
- max_value: Precision::Absent,
- min_value: Precision::Absent,
- sum_value: Precision::Absent,
- null_count: Precision::Absent,
- byte_size: Precision::Absent,
- },
- ],
- };
+ let expected = Statistics::default()
+ .with_num_rows(Precision::Exact(12))
+ .with_total_byte_size(Precision::Exact(52))
+ .add_column_statistics(
+ ColumnStatistics::new_unknown()
+ .with_distinct_count(Precision::Inexact(8))
+
.with_min_value(Precision::Exact(ScalarValue::UInt32(Some(1))))
+
.with_max_value(Precision::Exact(ScalarValue::UInt32(Some(34))))
+
.with_sum_value(Precision::Exact(ScalarValue::UInt64(Some(50))))
+ .with_null_count(Precision::Exact(1))
+ .with_byte_size(Precision::Exact(100)),
+ );
- assert_eq!(result, expected);
+ (schema, left, right, expected)
}
#[test]
- fn test_union_distinct_count() {
Review Comment:
I think there is still some regression in terms of test coverage:
the old `test_stats_union` covered multi-column merging with mixed types
(`Int64`, `Utf8`, `Float32`) and mixed absent/present stats across columns. The
new tests use a single `UInt32` column with all stats present.
Could you add a multi-column test case (e.g., 2-3 columns with different
types, some with absent stats) to close the gap?
##########
datafusion/common/src/stats.rs:
##########
@@ -713,7 +713,7 @@ impl Statistics {
) {
(Some(&l), Some(&r)) => Precision::Inexact(
estimate_ndv_with_overlap(col_stats, item_cs, l, r)
- .unwrap_or_else(|| usize::max(l, r)),
+ .unwrap_or_else(|| l.saturating_add(r)),
Review Comment:
The proposed change at this line is a semantic, the proposed fallback is
sensible for unions (independent streams, summing NDVs is a good upper bound)
but this function is also used to share statistics for Parquet files (see
[statistics.rs#L482](https://github.com/apache/datafusion/blob/8a45d0250f3095230902043544a50bdc330131b1/datafusion/datasource/src/statistics.rs#L482)
and
[statistics.rs#L528](https://github.com/apache/datafusion/blob/8a45d0250f3095230902043544a50bdc330131b1/datafusion/datasource/src/statistics.rs#L528)),
for which `max` is a more classic fallback (files from the same table are
likely to share common values, so summing NDV would overshoot in general).
One option would be to have a configurable fallback (e.g., an enum
`NdvFallback::Max` vs `NdvFallback::Sum`), so the callers can choose based on
their own semantics. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]