kosiew commented on code in PR #21548:
URL: https://github.com/apache/datafusion/pull/21548#discussion_r3135964158
##########
datafusion/spark/src/function/aggregate/avg.rs:
##########
@@ -343,7 +345,141 @@ where
])
}
+ fn convert_to_state(
+ &self,
+ values: &[ArrayRef],
+ opt_filter: Option<&BooleanArray>,
+ ) -> Result<Vec<ArrayRef>> {
+ let sums = values[0]
+ .as_primitive::<T>()
+ .clone()
+ .with_data_type(self.return_data_type.clone());
+
+ let counts = Int64Array::from_value(1, sums.len());
+
+ // null out filtered / null input rows
+ let filter_nulls = opt_filter.map(|f| {
+ let (bools, nulls) = f.clone().into_parts();
+ NullBuffer::union(Some(&NullBuffer::from(bools)), nulls.as_ref())
+ .expect("at least one input is Some")
+ });
+ let nulls =
+ NullBuffer::union(filter_nulls.as_ref(),
sums.logical_nulls().as_ref());
+
+ let (dt, buf, _) = sums.into_parts();
+ let sums = PrimitiveArray::<T>::new(buf,
nulls.clone()).with_data_type(dt);
+ let (_, buf, _) = counts.into_parts();
+ let counts = Int64Array::new(buf, nulls);
+
+ // [sum, count] - must match state() and merge_batch()
+ Ok(vec![
+ Arc::new(sums) as ArrayRef,
+ Arc::new(counts) as ArrayRef,
+ ])
+ }
+
+ fn supports_convert_to_state(&self) -> bool {
Review Comment:
Could we hold off on enabling `supports_convert_to_state()` here until the
merge path is null-aware?
`merge_batch()` above still merges `partial_sums.values()` and
`partial_counts.values()` directly, without checking the null bitmap. Since
`convert_to_state()` encodes filtered rows and null inputs as null state
entries, once skip-partial-aggregation kicks in those rows can still contribute
their backing values during merge.
For example, `avg([1.0, NULL, 3.0])` could be merged as if the null row were
present, which would produce the wrong average. I think this needs a null-aware
merge path, similar to built-in Avg, before this can safely be turned on.
##########
datafusion/spark/src/function/aggregate/avg.rs:
##########
@@ -343,7 +345,141 @@ where
])
}
+ fn convert_to_state(
+ &self,
+ values: &[ArrayRef],
+ opt_filter: Option<&BooleanArray>,
+ ) -> Result<Vec<ArrayRef>> {
+ let sums = values[0]
+ .as_primitive::<T>()
+ .clone()
+ .with_data_type(self.return_data_type.clone());
+
+ let counts = Int64Array::from_value(1, sums.len());
+
+ // null out filtered / null input rows
Review Comment:
Small suggestion while we are in this area: could this reuse the shared
`filtered_null_mask` and `set_nulls` helpers from
`datafusion_functions_aggregate_common` instead of open-coding the null-mask
construction?
The built-in Avg already uses those utilities, and sharing them here would
make the Spark implementation easier to compare against it and less likely to
drift again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]