EeshanBembi commented on code in PR #21782:
URL: https://github.com/apache/datafusion/pull/21782#discussion_r3130129903


##########
datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs:
##########
@@ -518,3 +520,162 @@ impl Accumulator for 
Bitmap65536DistinctCountAccumulatorI16 {
         size_of_val(self) + 8192
     }
 }
+
+/// Sliding-window variant of [`PrimitiveDistinctCountAccumulator`].
+#[derive(Debug)]
+pub struct SlidingPrimitiveDistinctCountAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send,
+    T::Native: Eq + Hash,
+{
+    counts: HashMap<T::Native, usize, RandomState>,
+    data_type: DataType,
+}
+
+impl<T> SlidingPrimitiveDistinctCountAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send,
+    T::Native: Eq + Hash,
+{
+    pub fn new(data_type: &DataType) -> Self {
+        Self {
+            counts: HashMap::default(),
+            data_type: data_type.clone(),
+        }
+    }
+}
+
+impl<T> Accumulator for SlidingPrimitiveDistinctCountAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send + Debug,
+    T::Native: Eq + Hash,
+{
+    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
+        let arr = Arc::new(
+            PrimitiveArray::<T>::from_iter_values(self.counts.keys().cloned())
+                .with_data_type(self.data_type.clone()),
+        );
+        Ok(vec![
+            SingleRowListArrayBuilder::new(arr).build_list_scalar(),
+        ])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> 
datafusion_common::Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        let arr = as_primitive_array::<T>(&values[0])?;
+        if arr.null_count() == 0 {
+            for value in arr.values().iter() {
+                *self.counts.entry(*value).or_insert(0) += 1;
+            }
+        } else {
+            for idx in 0..arr.len() {
+                if arr.is_valid(idx) {
+                    *self.counts.entry(arr.value(idx)).or_insert(0) += 1;
+                }
+            }
+        }
+        Ok(())
+    }
+
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> 
datafusion_common::Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        let arr = as_primitive_array::<T>(&values[0])?;
+        if arr.null_count() == 0 {
+            for value in arr.values().iter() {
+                if let Some(count) = self.counts.get_mut(value) {

Review Comment:
   The retract path silently ignores values not found in `self.counts`:
   
   ```rust
   if let Some(count) = self.counts.get_mut(value) {
       *count -= 1;
       if *count == 0 { self.counts.remove(value); }
   }
   // values absent from the map: silently skipped
   ```
   
   In the current window-frame infrastructure the caller guarantees symmetry 
(every retract matches a prior update), so this never fires in practice. But a 
silent no-op means bugs in the caller become invisible and corrupt the distinct 
count without any diagnostic. Please add at least a `debug_assert!`:
   
   ```rust
   let count = self.counts.get_mut(value);
   debug_assert!(count.is_some(), "retract_batch called for a value not in the 
accumulator");
   if let Some(count) = count {
       *count -= 1;
       if *count == 0 { self.counts.remove(value); }
   }
   ```
   
   Zero cost in release builds, catches invariant violations immediately in 
debug/test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to