kosiew commented on code in PR #21548:
URL: https://github.com/apache/datafusion/pull/21548#discussion_r3135964158


##########
datafusion/spark/src/function/aggregate/avg.rs:
##########
@@ -343,7 +345,141 @@ where
         ])
     }
 
+    fn convert_to_state(
+        &self,
+        values: &[ArrayRef],
+        opt_filter: Option<&BooleanArray>,
+    ) -> Result<Vec<ArrayRef>> {
+        let sums = values[0]
+            .as_primitive::<T>()
+            .clone()
+            .with_data_type(self.return_data_type.clone());
+
+        let counts = Int64Array::from_value(1, sums.len());
+
+        // null out filtered / null input rows
+        let filter_nulls = opt_filter.map(|f| {
+            let (bools, nulls) = f.clone().into_parts();
+            NullBuffer::union(Some(&NullBuffer::from(bools)), nulls.as_ref())
+                .expect("at least one input is Some")
+        });
+        let nulls =
+            NullBuffer::union(filter_nulls.as_ref(), 
sums.logical_nulls().as_ref());
+
+        let (dt, buf, _) = sums.into_parts();
+        let sums = PrimitiveArray::<T>::new(buf, 
nulls.clone()).with_data_type(dt);
+        let (_, buf, _) = counts.into_parts();
+        let counts = Int64Array::new(buf, nulls);
+
+        // [sum, count] - must match state() and merge_batch()
+        Ok(vec![
+            Arc::new(sums) as ArrayRef,
+            Arc::new(counts) as ArrayRef,
+        ])
+    }
+
+    fn supports_convert_to_state(&self) -> bool {

Review Comment:
   Could we hold off on enabling `supports_convert_to_state()` here until the 
merge path is null-aware?
   
   `merge_batch()` above still merges `partial_sums.values()` and 
`partial_counts.values()` directly, without checking the null bitmap. Since 
`convert_to_state()` encodes filtered rows and null inputs as null state 
entries, once skip-partial-aggregation kicks in those rows can still contribute 
their backing values during merge.
   
   For example, `avg([1.0, NULL, 3.0])` could be merged as if the null row were 
present, which would produce the wrong average. I think this needs a null-aware 
merge path, similar to built-in Avg, before this can safely be turned on.



##########
datafusion/spark/src/function/aggregate/avg.rs:
##########
@@ -343,7 +345,141 @@ where
         ])
     }
 
+    fn convert_to_state(
+        &self,
+        values: &[ArrayRef],
+        opt_filter: Option<&BooleanArray>,
+    ) -> Result<Vec<ArrayRef>> {
+        let sums = values[0]
+            .as_primitive::<T>()
+            .clone()
+            .with_data_type(self.return_data_type.clone());
+
+        let counts = Int64Array::from_value(1, sums.len());
+
+        // null out filtered / null input rows

Review Comment:
   Small suggestion while we are in this area: could this reuse the shared 
`filtered_null_mask` and `set_nulls` helpers from 
`datafusion_functions_aggregate_common` instead of open-coding the null-mask 
construction?
   
   The built-in Avg already uses those utilities, and sharing them here would 
make the Spark implementation easier to compare against it and less likely to 
drift again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to