haohuaijin commented on code in PR #22768:
URL: https://github.com/apache/datafusion/pull/22768#discussion_r3362326103
##########
datafusion/functions-aggregate/src/approx_distinct.rs:
##########
@@ -294,6 +297,371 @@ where
default_accumulator_impl!();
}
+/// Maximum number of distinct hashes kept in the sparse representation of a
+/// per-group sketch before it is promoted to a dense [`HyperLogLog`].
+///
+/// A dense sketch always occupies [`NUM_REGISTERS`] (16 KiB) regardless of how
+/// many values it has seen. The vast majority of groups in a high-cardinality
+/// `GROUP BY` only observe a handful of distinct values, so keeping their
state
+/// as a small list of hashes saves a huge amount of memory (both while
+/// aggregating and when serializing the partial state for the final phase).
+const SPARSE_LIMIT: usize = 256;
+
+/// Per-group HyperLogLog state used by [`HllGroupsAccumulator`].
+///
+/// Starts out as a compact list of the (deduplicated) hashes observed for the
+/// group and only switches to a full dense [`HyperLogLog`] once it has seen
more
+/// than [`SPARSE_LIMIT`] distinct values. Folding the stored hashes into a
dense
+/// sketch produces exactly the same registers as adding the original values
one
+/// by one, so the cardinality estimate is identical to the per-group
+/// [`Accumulator`] path.
+#[derive(Clone, Debug)]
+enum GroupHll {
+ /// Distinct hashes seen so far. May contain duplicates between
compactions.
+ Sparse(Vec<u64>),
+ Dense(Box<HyperLogLog<u8>>),
+}
+
+impl Default for GroupHll {
+ fn default() -> Self {
+ GroupHll::Sparse(Vec::new())
+ }
+}
+
+impl GroupHll {
+ /// Add a pre-computed hash, returning the change in heap-allocated bytes
so
+ /// the accumulator can track its memory usage incrementally.
+ #[inline]
+ fn add_hash(&mut self, hash: u64) -> isize {
+ match self {
+ GroupHll::Dense(hll) => {
+ hll.add_hashed(hash);
+ 0
+ }
+ GroupHll::Sparse(v) => {
+ let cap_before = v.capacity();
+ v.push(hash);
+ if v.len() >= 2 * SPARSE_LIMIT {
+ return self.compact_or_promote(cap_before);
+ }
+ ((v.capacity() - cap_before) * size_of::<u64>()) as isize
+ }
+ }
+ }
+
+ /// Deduplicate the sparse hash list and, if it still exceeds
+ /// [`SPARSE_LIMIT`] distinct values, promote it to a dense sketch.
+ #[cold]
+ fn compact_or_promote(&mut self, cap_before: usize) -> isize {
+ let GroupHll::Sparse(v) = self else {
+ return 0;
+ };
+ v.sort_unstable();
+ v.dedup();
+ if v.len() > SPARSE_LIMIT {
+ let mut hll = HyperLogLog::<u8>::new();
+ for &h in v.iter() {
+ hll.add_hashed(h);
+ }
+ *self = GroupHll::Dense(Box::new(hll));
+ (NUM_REGISTERS as isize) - ((cap_before * size_of::<u64>()) as
isize)
+ } else {
+ // capacity is unchanged by sort/dedup
+ 0
+ }
+ }
+
+ /// Merge a serialized state (produced by [`Self::serialize`] or by the
+ /// per-group [`Accumulator`]) into this sketch.
+ fn merge_serialized(&mut self, bytes: &[u8]) -> Result<isize> {
+ if bytes.is_empty() {
+ return Ok(0);
+ }
+ if bytes.len() == NUM_REGISTERS {
+ let other: HyperLogLog<u8> = bytes.try_into()?;
+ Ok(self.merge_dense(&other))
+ } else {
+ debug_assert_eq!(bytes.len() % size_of::<u64>(), 0);
+ let mut delta = 0;
+ for chunk in bytes.chunks_exact(size_of::<u64>()) {
+ let h = u64::from_le_bytes(chunk.try_into().unwrap());
+ delta += self.add_hash(h);
+ }
+ Ok(delta)
+ }
+ }
+
+ /// Merge a dense sketch into this one, promoting to dense if necessary.
+ fn merge_dense(&mut self, other: &HyperLogLog<u8>) -> isize {
+ match self {
+ GroupHll::Dense(hll) => {
+ hll.merge(other);
+ 0
+ }
+ GroupHll::Sparse(v) => {
+ let cap_before = v.capacity();
+ let mut hll = other.clone();
+ for &h in v.iter() {
+ hll.add_hashed(h);
+ }
+ *self = GroupHll::Dense(Box::new(hll));
+ (NUM_REGISTERS as isize) - ((cap_before * size_of::<u64>()) as
isize)
+ }
+ }
+ }
+
+ /// The approximate number of distinct values seen by this group.
+ fn count(&self) -> u64 {
+ match self {
+ GroupHll::Dense(hll) => hll.count() as u64,
+ // Estimate directly from the stored hashes; this produces exactly
the
+ // same value as folding them into a dense sketch but avoids
+ // allocating and scanning a 16 KiB register array for every group.
+ GroupHll::Sparse(v) => count_from_hashes(v) as u64,
+ }
+ }
+
+ /// Heap bytes held by this sketch. Mirrors the deltas accrued in
+ /// [`Self::add_hash`] / [`Self::merge_dense`] so emitting a group can
+ /// precisely reverse them.
+ fn heap_bytes(&self) -> usize {
+ match self {
+ GroupHll::Sparse(v) => v.capacity() * size_of::<u64>(),
+ GroupHll::Dense(_) => NUM_REGISTERS,
+ }
+ }
+
+ /// Serialize the sketch into `scratch` (which is cleared first). A dense
+ /// sketch is written as its raw [`NUM_REGISTERS`] registers
(wire-compatible
+ /// with the per-group [`Accumulator`]); a sparse sketch is written as its
+ /// distinct hashes in little-endian order.
+ fn serialize(&mut self, scratch: &mut Vec<u8>) {
+ scratch.clear();
+ match self {
+ GroupHll::Dense(hll) => {
+ let registers: &[u8] = (**hll).as_ref();
+ scratch.extend_from_slice(registers);
+ }
+ GroupHll::Sparse(v) => {
+ v.sort_unstable();
+ v.dedup();
+ for &h in v.iter() {
+ scratch.extend_from_slice(&h.to_le_bytes());
+ }
+ }
+ }
+ }
+}
+
+/// Computes HyperLogLog hashes for the rows of an input array, type by type.
+///
+/// The hashing matches the per-group [`Accumulator`] implementations exactly
so
+/// that the grouped and ungrouped paths produce identical estimates.
+trait HllValueHasher: Send + Sync + 'static {
+ /// Invoke `f(row_index, hash)` for every non-null row of `array`.
+ fn for_each_hash(array: &dyn Array, f: impl FnMut(usize, u64));
+}
+
+struct NumericHasher<T>(PhantomData<T>);
+
+impl<T> HllValueHasher for NumericHasher<T>
+where
+ T: ArrowPrimitiveType + Send + Sync + 'static,
+ T::Native: Hash,
+{
+ #[inline]
+ fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) {
+ let array: &PrimitiveArray<T> = array.as_primitive::<T>();
+ if array.null_count() == 0 {
+ for (i, v) in array.values().iter().enumerate() {
+ f(i, HLL_HASH_STATE.hash_one(v));
+ }
+ } else {
+ for i in 0..array.len() {
+ if array.is_valid(i) {
+ f(i, HLL_HASH_STATE.hash_one(array.value(i)));
+ }
+ }
+ }
+ }
+}
+
+struct Utf8Hasher<O>(PhantomData<O>);
+
+impl<O: OffsetSizeTrait> HllValueHasher for Utf8Hasher<O> {
+ #[inline]
+ fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) {
+ let array: &GenericStringArray<O> = array.as_string::<O>();
+ for i in 0..array.len() {
+ if array.is_valid(i) {
+ f(i, HLL_HASH_STATE.hash_one(array.value(i)));
+ }
+ }
+ }
+}
+
+struct Utf8ViewHasher;
+
+impl HllValueHasher for Utf8ViewHasher {
+ #[inline]
+ fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) {
+ let array: &StringViewArray = array.as_string_view();
+ // Mirror `StringViewHLLAccumulator`: hash the raw inline view when all
+ // strings are stored inline (≤ 12 bytes), avoiding `&str`
materialization.
+ if array.data_buffers().is_empty() {
+ let views = array.views();
+ for i in 0..array.len() {
+ if array.is_valid(i) {
+ f(i, HLL_HASH_STATE.hash_one(views[i]));
+ }
+ }
+ } else {
+ for i in 0..array.len() {
+ if array.is_valid(i) {
+ f(i, HLL_HASH_STATE.hash_one(array.value(i)));
+ }
+ }
+ }
+ }
+}
+
+struct BinaryHasher<O>(PhantomData<O>);
+
+impl<O: OffsetSizeTrait> HllValueHasher for BinaryHasher<O> {
+ #[inline]
+ fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) {
+ let array: &GenericBinaryArray<O> = array.as_binary::<O>();
+ for i in 0..array.len() {
+ if array.is_valid(i) {
+ f(i, HLL_HASH_STATE.hash_one(array.value(i)));
+ }
+ }
+ }
+}
+
+/// A [`GroupsAccumulator`] for `approx_distinct` that keeps one adaptive
+/// (sparse → dense) HyperLogLog sketch per group.
+///
+/// This is dramatically faster than the generic `GroupsAccumulatorAdapter`
+/// fallback for high-cardinality `GROUP BY`s: it processes the whole input in
a
+/// single vectorized pass (no per-group `take`/slice and no dynamic dispatch),
+/// and the sparse representation avoids allocating a 16 KiB sketch for every
+/// group when most groups only see a few distinct values.
+struct HllGroupsAccumulator<H: HllValueHasher> {
+ /// Per-group sketches, indexed by `group_index`.
+ groups: Vec<GroupHll>,
+ /// Incrementally maintained estimate of heap bytes used by `groups`.
+ allocated_bytes: usize,
+ phantom: PhantomData<H>,
+}
+
+impl<H: HllValueHasher> HllGroupsAccumulator<H> {
+ fn new() -> Self {
+ Self {
+ groups: Vec::new(),
+ allocated_bytes: 0,
+ phantom: PhantomData,
+ }
+ }
+
+ #[inline]
+ fn ensure_groups(&mut self, total_num_groups: usize) {
+ if total_num_groups > self.groups.len() {
+ self.groups.resize_with(total_num_groups, GroupHll::default);
+ }
+ }
+
+ #[inline]
+ fn apply_delta(&mut self, delta: isize) {
+ self.allocated_bytes =
+ (self.allocated_bytes as isize).saturating_add(delta).max(0) as
usize;
+ }
+}
+
+impl<H: HllValueHasher> GroupsAccumulator for HllGroupsAccumulator<H> {
+ fn update_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ self.ensure_groups(total_num_groups);
+ let groups = &mut self.groups;
+ let mut delta: isize = 0;
+ match opt_filter {
+ None => H::for_each_hash(values[0].as_ref(), |row, hash| {
+ delta += groups[group_indices[row]].add_hash(hash);
+ }),
+ Some(filter) => H::for_each_hash(values[0].as_ref(), |row, hash| {
+ if filter.value(row) {
+ delta += groups[group_indices[row]].add_hash(hash);
+ }
+ }),
+ }
+ self.apply_delta(delta);
+ Ok(())
+ }
+
+ fn merge_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ self.ensure_groups(total_num_groups);
+ let states = downcast_value!(values[0], BinaryArray);
+ let mut delta: isize = 0;
+ for (row, &group_index) in group_indices.iter().enumerate() {
+ if let Some(filter) = opt_filter
+ && !filter.value(row)
+ {
+ continue;
+ }
+ if states.is_valid(row) {
+ delta +=
self.groups[group_index].merge_serialized(states.value(row))?;
+ }
+ }
+ self.apply_delta(delta);
+ Ok(())
+ }
+
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let groups = emit_to.take_needed(&mut self.groups);
+ let mut freed = 0;
+ let counts: UInt64Array = groups
+ .iter()
+ .map(|g| {
+ freed += g.heap_bytes();
+ Some(g.count())
+ })
+ .collect();
+ // The emitted groups have been removed; reclaim their tracked bytes.
+ self.allocated_bytes = self.allocated_bytes.saturating_sub(freed);
+ Ok(Arc::new(counts))
+ }
+
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ let mut groups = emit_to.take_needed(&mut self.groups);
+ let mut builder = BinaryBuilder::new();
+ let mut scratch: Vec<u8> = Vec::new();
+ let mut freed = 0;
+ for g in groups.iter_mut() {
+ freed += g.heap_bytes();
+ g.serialize(&mut scratch);
+ builder.append_value(&scratch);
Review Comment:
it will return 0, i check this is consistent with current main branch and
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]