haohuaijin commented on code in PR #22768:
URL: https://github.com/apache/datafusion/pull/22768#discussion_r3362395813
##########
datafusion/functions-aggregate/src/approx_distinct.rs:
##########
@@ -481,7 +849,209 @@ impl AggregateUDFImpl for ApproxDistinct {
Ok(accumulator)
}
+ fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
+ is_hll_groups_type(args.expr_fields[0].data_type())
+ }
+
+ fn create_groups_accumulator(
+ &self,
+ args: AccumulatorArgs,
+ ) -> Result<Box<dyn GroupsAccumulator>> {
+ let data_type = args.expr_fields[0].data_type();
+ let accumulator: Box<dyn GroupsAccumulator> = match data_type {
+ DataType::UInt32 => {
+
Box::new(HllGroupsAccumulator::<NumericHasher<UInt32Type>>::new())
+ }
+ DataType::UInt64 => {
+
Box::new(HllGroupsAccumulator::<NumericHasher<UInt64Type>>::new())
+ }
+ DataType::Int32 => {
+
Box::new(HllGroupsAccumulator::<NumericHasher<Int32Type>>::new())
+ }
+ DataType::Int64 => {
+
Box::new(HllGroupsAccumulator::<NumericHasher<Int64Type>>::new())
+ }
+ DataType::Date32 => {
+
Box::new(HllGroupsAccumulator::<NumericHasher<Date32Type>>::new())
+ }
+ DataType::Date64 => {
+
Box::new(HllGroupsAccumulator::<NumericHasher<Date64Type>>::new())
+ }
+ DataType::Time32(TimeUnit::Second) => {
+
Box::new(HllGroupsAccumulator::<NumericHasher<Time32SecondType>>::new())
+ }
+ DataType::Time32(TimeUnit::Millisecond) =>
Box::new(HllGroupsAccumulator::<
+ NumericHasher<Time32MillisecondType>,
+ >::new()),
+ DataType::Time64(TimeUnit::Microsecond) =>
Box::new(HllGroupsAccumulator::<
+ NumericHasher<Time64MicrosecondType>,
+ >::new()),
+ DataType::Time64(TimeUnit::Nanosecond) =>
Box::new(HllGroupsAccumulator::<
+ NumericHasher<Time64NanosecondType>,
+ >::new()),
+ DataType::Timestamp(TimeUnit::Second, _) =>
Box::new(HllGroupsAccumulator::<
+ NumericHasher<TimestampSecondType>,
+ >::new()),
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ Box::new(HllGroupsAccumulator::<
+ NumericHasher<TimestampMillisecondType>,
+ >::new())
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ Box::new(HllGroupsAccumulator::<
+ NumericHasher<TimestampMicrosecondType>,
+ >::new())
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => Box::new(
+
HllGroupsAccumulator::<NumericHasher<TimestampNanosecondType>>::new(),
+ ),
+ DataType::Utf8 =>
Box::new(HllGroupsAccumulator::<Utf8Hasher<i32>>::new()),
+ DataType::LargeUtf8 => {
+ Box::new(HllGroupsAccumulator::<Utf8Hasher<i64>>::new())
+ }
+ DataType::Utf8View =>
Box::new(HllGroupsAccumulator::<Utf8ViewHasher>::new()),
+ DataType::Binary => {
+ Box::new(HllGroupsAccumulator::<BinaryHasher<i32>>::new())
+ }
+ DataType::LargeBinary => {
+ Box::new(HllGroupsAccumulator::<BinaryHasher<i64>>::new())
+ }
+ other => {
+ return not_impl_err!(
+ "GroupsAccumulator for 'approx_distinct' is not
implemented for data type {other}"
+ );
+ }
+ };
+ Ok(accumulator)
+ }
+
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
}
+
+/// Returns true for the data types backed by the HyperLogLog
+/// [`HllGroupsAccumulator`]. The fixed-domain types (booleans / small ints)
and
+/// `Null` fall back to the per-group [`Accumulator`] path.
+fn is_hll_groups_type(data_type: &DataType) -> bool {
Review Comment:
update in
[db8baf2](https://github.com/apache/datafusion/pull/22768/commits/db8baf2ee4f51086b71a601d65322d09608565fa)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]