This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eb041ce45c6 smart distinct count based on hllplus (#17011)
eb041ce45c6 is described below

commit eb041ce45c65f1355ef3d95899de2e86143502d6
Author: Mohemmad Zaid Khan <[email protected]>
AuthorDate: Wed Oct 15 00:58:53 2025 +0530

    smart distinct count based on hllplus (#17011)
---
 .../query/NonScanBasedAggregationOperator.java     |  14 +
 .../pinot/core/plan/AggregationPlanNode.java       |   3 +-
 .../function/AggregationFunctionFactory.java       |   2 +
 ...stinctCountSmartHLLPlusAggregationFunction.java | 432 +++++++++++++++++++++
 .../pinot/queries/DistinctCountQueriesTest.java    |  87 +++++
 .../pinot/segment/spi/AggregationFunctionType.java |   2 +
 6 files changed, 539 insertions(+), 1 deletion(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index 8e7f6a41639..e679264ce58 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -44,6 +44,7 @@ import 
org.apache.pinot.core.query.aggregation.function.DistinctCountOffHeapAggr
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLPlusAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLPlusAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartULLAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountULLAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -164,6 +165,10 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
           result = 
getDistinctCountSmartHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
               (DistinctCountSmartHLLAggregationFunction) aggregationFunction);
           break;
+        case DISTINCTCOUNTSMARTHLLPLUS:
+          result = 
getDistinctCountSmartHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
+              (DistinctCountSmartHLLPlusAggregationFunction) 
aggregationFunction);
+          break;
         case DISTINCTCOUNTULL:
           result = 
getDistinctCountULLResult(Objects.requireNonNull(dataSource.getDictionary()),
               (DistinctCountULLAggregationFunction) aggregationFunction);
@@ -356,6 +361,15 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
       return getDistinctValueSet(dictionary);
     }
   }
+  private static Object getDistinctCountSmartHLLPlusResult(Dictionary 
dictionary,
+      DistinctCountSmartHLLPlusAggregationFunction function) {
+    if (dictionary.length() > function.getThreshold()) {
+      // Store values into a HLLPlus when the dictionary size exceeds the 
conversion threshold
+      return getDistinctValueHLLPlus(dictionary, function.getP(), 
function.getSp());
+    } else {
+      return getDistinctValueSet(dictionary);
+    }
+  }
 
   private static UltraLogLog getDistinctCountULLResult(Dictionary dictionary,
       DistinctCountULLAggregationFunction function) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 530cd3d9937..055a366749b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -53,7 +53,8 @@ public class AggregationPlanNode implements PlanNode {
           DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTSUM, DISTINCTSUMMV, 
DISTINCTAVG, DISTINCTAVGMV, DISTINCTCOUNTOFFHEAP,
           DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTRAWHLLMV, DISTINCTCOUNTHLLPLUS,
           DISTINCTCOUNTHLLPLUSMV, DISTINCTCOUNTRAWHLLPLUS, 
DISTINCTCOUNTRAWHLLPLUSMV, DISTINCTCOUNTULL,
-          DISTINCTCOUNTRAWULL, SEGMENTPARTITIONEDDISTINCTCOUNT, 
DISTINCTCOUNTSMARTHLL, DISTINCTCOUNTSMARTULL);
+          DISTINCTCOUNTRAWULL, SEGMENTPARTITIONEDDISTINCTCOUNT, 
DISTINCTCOUNTSMARTHLL, DISTINCTCOUNTSMARTHLLPLUS,
+          DISTINCTCOUNTSMARTULL);
 
   // DISTINCTCOUNT excluded because consuming segment metadata contains 
unknown cardinality when there is no dictionary
   // MINSTRING / MAXSTRING excluded because of string column metadata issues 
(see discussion in
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index d6824d75b24..af18064d185 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -384,6 +384,8 @@ public class AggregationFunctionFactory {
             return new DistinctCountRawHLLAggregationFunction(arguments);
           case DISTINCTCOUNTSMARTHLL:
             return new DistinctCountSmartHLLAggregationFunction(arguments);
+          case DISTINCTCOUNTSMARTHLLPLUS:
+            return new DistinctCountSmartHLLPlusAggregationFunction(arguments);
           case DISTINCTCOUNTSMARTULL:
             return new DistinctCountSmartULLAggregationFunction(arguments);
           case FASTHLL:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLPlusAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLPlusAggregationFunction.java
new file mode 100644
index 00000000000..cca0d5b143b
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLPlusAggregationFunction.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountSmartHLLPlusAggregationFunction} calculates the 
number of distinct values for a given
+ * expression (both single-valued and multi-valued are supported).
+ *
+ * For aggregation-only queries, the distinct values are stored in a Set 
initially. Once the number of distinct values
+ * exceeds a threshold, the Set will be converted into a HyperLogLogPlus, and 
approximate result will be returned.
+ *
+ * The function takes an optional second argument for parameters:
+ * - threshold: Threshold of the number of distinct values to trigger the 
conversion, 100_000 by default. Non-positive
+ *              value means never convert.
+ * - p: Parameter p for the converted HyperLogLogPlus, 14 by default.
+ * - sp: Parameter sp for the converted HyperLogLogPlus, 0 by default.
+ * Example of second argument: 'threshold=10;p=12;sp=25'
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountSmartHLLPlusAggregationFunction extends 
BaseDistinctCountSmartSketchAggregationFunction {
+
+  private final int _threshold;
+  private final int _p;
+  private final int _sp;
+
+  public DistinctCountSmartHLLPlusAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments.get(0));
+
+    if (arguments.size() > 1) {
+      Parameters parameters = new 
Parameters(arguments.get(1).getLiteral().getStringValue());
+      _threshold = parameters._threshold;
+      _p = parameters._p;
+      _sp = parameters._sp;
+    } else {
+      _threshold = Parameters.DEFAULT_THRESHOLD;
+      _p = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P;
+      _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+    }
+  }
+
+  public int getThreshold() {
+    return _threshold;
+  }
+
+  public int getP() {
+    return _p;
+  }
+
+  public int getSp() {
+    return _sp;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTSMARTHLLPLUS;
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, 
dictionary);
+      if (blockValSet.isSingleValue()) {
+        int[] dictIds = blockValSet.getDictionaryIdsSV();
+        dictIdBitmap.addN(dictIds, 0, length);
+      } else {
+        int[][] dictIds = blockValSet.getDictionaryIdsMV();
+        for (int i = 0; i < length; i++) {
+          dictIdBitmap.add(dictIds[i]);
+        }
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the value set 
or HLLPlus
+    if (aggregationResultHolder.getResult() instanceof HyperLogLogPlus) {
+      aggregateIntoHLLPlus(length, aggregationResultHolder, blockValSet);
+    } else {
+      aggregateIntoSet(length, aggregationResultHolder, blockValSet);
+    }
+  }
+
+  private void aggregateIntoHLLPlus(int length, AggregationResultHolder 
aggregationResultHolder,
+                                    BlockValSet blockValSet) {
+    DataType valueType = blockValSet.getValueType();
+    DataType storedType = valueType.getStoredType();
+    HyperLogLogPlus hllPlus = aggregationResultHolder.getResult();
+    if (blockValSet.isSingleValue()) {
+      switch (storedType) {
+        case INT:
+          int[] intValues = blockValSet.getIntValuesSV();
+          for (int i = 0; i < length; i++) {
+            hllPlus.offer(intValues[i]);
+          }
+          break;
+        case LONG:
+          long[] longValues = blockValSet.getLongValuesSV();
+          for (int i = 0; i < length; i++) {
+            hllPlus.offer(longValues[i]);
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = blockValSet.getFloatValuesSV();
+          for (int i = 0; i < length; i++) {
+            hllPlus.offer(floatValues[i]);
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = blockValSet.getDoubleValuesSV();
+          for (int i = 0; i < length; i++) {
+            hllPlus.offer(doubleValues[i]);
+          }
+          break;
+        case STRING:
+          String[] stringValues = blockValSet.getStringValuesSV();
+          for (int i = 0; i < length; i++) {
+            hllPlus.offer(stringValues[i]);
+          }
+          break;
+        case BYTES:
+          byte[][] bytesValues = blockValSet.getBytesValuesSV();
+          for (int i = 0; i < length; i++) {
+            hllPlus.offer(bytesValues[i]);
+          }
+          break;
+        default:
+          throw getIllegalDataTypeException(valueType, true);
+      }
+    } else {
+      switch (storedType) {
+        case INT:
+          int[][] intValues = blockValSet.getIntValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (int value : intValues[i]) {
+              hllPlus.offer(value);
+            }
+          }
+          break;
+        case LONG:
+          long[][] longValues = blockValSet.getLongValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (long value : longValues[i]) {
+              hllPlus.offer(value);
+            }
+          }
+          break;
+        case FLOAT:
+          float[][] floatValues = blockValSet.getFloatValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (float value : floatValues[i]) {
+              hllPlus.offer(value);
+            }
+          }
+          break;
+        case DOUBLE:
+          double[][] doubleValues = blockValSet.getDoubleValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (double value : doubleValues[i]) {
+              hllPlus.offer(value);
+            }
+          }
+          break;
+        case STRING:
+          String[][] stringValues = blockValSet.getStringValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (String value : stringValues[i]) {
+              hllPlus.offer(value);
+            }
+          }
+          break;
+        default:
+          throw getIllegalDataTypeException(valueType, false);
+      }
+    }
+  }
+
+  protected HyperLogLogPlus convertSetToHLLPlus(Set valueSet, DataType 
storedType) {
+    if (storedType == DataType.BYTES) {
+      return convertByteArraySetToHLLPlus((ObjectSet<ByteArray>) valueSet);
+    } else {
+      return convertNonByteArraySetToHLLPlus(valueSet);
+    }
+  }
+
+  protected HyperLogLogPlus convertByteArraySetToHLLPlus(ObjectSet<ByteArray> 
valueSet) {
+    HyperLogLogPlus hllPlus = new HyperLogLogPlus(_p, _sp);
+    for (ByteArray value : valueSet) {
+      hllPlus.offer(value.getBytes());
+    }
+    return hllPlus;
+  }
+
+  protected HyperLogLogPlus convertNonByteArraySetToHLLPlus(Set valueSet) {
+    HyperLogLogPlus hllPlus = new HyperLogLogPlus(_p, _sp);
+    for (Object value : valueSet) {
+      hllPlus.offer(value);
+    }
+    return hllPlus;
+  }
+
+  @Override
+  public Object merge(@Nullable Object intermediateResult1, @Nullable Object 
intermediateResult2) {
+    if (intermediateResult1 == null) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2 == null) {
+      return intermediateResult1;
+    }
+
+    if (intermediateResult1 instanceof HyperLogLogPlus) {
+      return mergeIntoHLLPlus((HyperLogLogPlus) intermediateResult1, 
intermediateResult2);
+    }
+    if (intermediateResult2 instanceof HyperLogLogPlus) {
+      return mergeIntoHLLPlus((HyperLogLogPlus) intermediateResult2, 
intermediateResult1);
+    }
+
+    Set valueSet1 = (Set) intermediateResult1;
+    Set valueSet2 = (Set) intermediateResult2;
+    if (valueSet1.isEmpty()) {
+      return valueSet2;
+    }
+    if (valueSet2.isEmpty()) {
+      return valueSet1;
+    }
+    valueSet1.addAll(valueSet2);
+
+    // Convert to HLLPlus if the set size exceeds the threshold
+    if (valueSet1.size() > _threshold) {
+      if (valueSet1 instanceof ObjectSet && valueSet1.iterator().next() 
instanceof ByteArray) {
+        return convertByteArraySetToHLLPlus((ObjectSet<ByteArray>) valueSet1);
+      } else {
+        return convertNonByteArraySetToHLLPlus(valueSet1);
+      }
+    } else {
+      return valueSet1;
+    }
+  }
+
+  private static HyperLogLogPlus mergeIntoHLLPlus(HyperLogLogPlus hllPlus, 
Object intermediateResult) {
+    if (intermediateResult instanceof HyperLogLogPlus) {
+      try {
+        hllPlus.addAll((HyperLogLogPlus) intermediateResult);
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging 
HyperLogLogPlus", e);
+      }
+    } else {
+      Set valueSet = (Set) intermediateResult;
+      if (!valueSet.isEmpty()) {
+        if (valueSet instanceof ObjectSet && valueSet.iterator().next() 
instanceof ByteArray) {
+          for (Object value : valueSet) {
+            hllPlus.offer(((ByteArray) value).getBytes());
+          }
+        } else {
+          for (Object value : valueSet) {
+            hllPlus.offer(value);
+          }
+        }
+      }
+    }
+    return hllPlus;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public SerializedIntermediateResult serializeIntermediateResult(Object o) {
+    if (o instanceof HyperLogLogPlus) {
+      return new 
SerializedIntermediateResult(ObjectSerDeUtils.ObjectType.HyperLogLogPlus.getValue(),
+          
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize((HyperLogLogPlus) o));
+    }
+    return BaseDistinctAggregateAggregationFunction.serializeSet((Set) o);
+  }
+
+  @Override
+  public Object deserializeIntermediateResult(@Nullable CustomObject 
customObject) {
+    return ObjectSerDeUtils.deserialize(customObject);
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.INT;
+  }
+
+  @Override
+  public Integer extractFinalResult(@Nullable Object intermediateResult) {
+    if (intermediateResult == null) {
+      return 0;
+    }
+    if (intermediateResult instanceof HyperLogLogPlus) {
+      return (int) ((HyperLogLogPlus) intermediateResult).cardinality();
+    } else {
+      return ((Set) intermediateResult).size();
+    }
+  }
+
+  @Override
+  public Integer mergeFinalResult(@Nullable Integer finalResult1, @Nullable 
Integer finalResult2) {
+    if (finalResult1 == null) {
+      return finalResult2 == null ? 0 : finalResult2;
+    }
+    if (finalResult2 == null) {
+      return finalResult1;
+    }
+    return finalResult1 + finalResult2;
+  }
+
+  /**
+   * Helper method to read dictionary and convert dictionary ids to a 
HyperLogLogPlus for
+   * dictionary-encoded expression.
+   */
+  private HyperLogLogPlus convertToHLLPlus(
+          BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper 
dictIdsWrapper) {
+    HyperLogLogPlus hllPlus = new HyperLogLogPlus(_p, _sp);
+    Dictionary dictionary = dictIdsWrapper._dictionary;
+    RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
+    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+    while (iterator.hasNext()) {
+      hllPlus.offer(dictionary.get(iterator.next()));
+    }
+    return hllPlus;
+  }
+
+  @Override
+  protected Object convertSetToSketch(Set valueSet, DataType storedType) {
+    return convertSetToHLLPlus(valueSet, storedType);
+  }
+
+  @Override
+  protected Object 
convertToSketch(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper 
dictIdsWrapper) {
+    return convertToHLLPlus(dictIdsWrapper);
+  }
+
+  @Override
+  protected IllegalStateException getIllegalDataTypeException(DataType 
dataType, boolean singleValue) {
+    return new IllegalStateException(
+        "Illegal data type for DISTINCT_COUNT_SMART_HLL_PLUS aggregation 
function: " + dataType + (singleValue ? ""
+            : "_MV"));
+  }
+
+  /**
+   * Helper class to wrap the parameters.
+   */
+  private static class Parameters {
+    static final char PARAMETER_DELIMITER = ';';
+    static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+
+    static final String THRESHOLD_KEY = "THRESHOLD";
+    // 100K values to trigger HLLPlus conversion by default
+    static final int DEFAULT_THRESHOLD = 100_000;
+
+    static final String P_KEY = "P";
+    static final String SP_KEY = "SP";
+
+    int _threshold = DEFAULT_THRESHOLD;
+    int _p = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P;
+    int _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+
+    Parameters(String parametersString) {
+      StringUtils.deleteWhitespace(parametersString);
+      String[] keyValuePairs = StringUtils.split(parametersString, 
PARAMETER_DELIMITER);
+      for (String keyValuePair : keyValuePairs) {
+        String[] keyAndValue = StringUtils.split(keyValuePair, 
PARAMETER_KEY_VALUE_SEPARATOR);
+        Preconditions.checkArgument(keyAndValue.length == 2, "Invalid 
parameter: %s", keyValuePair);
+        String key = keyAndValue[0];
+        String value = keyAndValue[1];
+        switch (key.toUpperCase()) {
+          case THRESHOLD_KEY:
+            _threshold = Integer.parseInt(value);
+            // Treat non-positive threshold as unlimited
+            if (_threshold <= 0) {
+              _threshold = Integer.MAX_VALUE;
+            }
+            break;
+          case P_KEY:
+            _p = Integer.parseInt(value);
+            break;
+          case SP_KEY:
+            _sp = Integer.parseInt(value);
+            break;
+          default:
+            throw new IllegalArgumentException("Invalid parameter key: " + 
key);
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
index be1666a9b6a..f3bac1959f9 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.queries;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -521,6 +522,92 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
     assertEquals(function.getLog2m(), 8);
   }
 
+  @Test
+  public void testSmartHLLPlus() {
+    // Dictionary based
+    String query = "SELECT DISTINCTCOUNTSMARTHLLPLUS(intColumn, 
'threshold=10'), "
+            + "DISTINCTCOUNTSMARTHLLPLUS(longColumn, 'threshold=10'), "
+            + "DISTINCTCOUNTSMARTHLLPLUS(floatColumn, 'threshold=10'), "
+            + "DISTINCTCOUNTSMARTHLLPLUS(doubleColumn, 'threshold=10'), "
+            + "DISTINCTCOUNTSMARTHLLPLUS(stringColumn, 'threshold=10'), "
+            + "DISTINCTCOUNTSMARTHLLPLUS(bytesColumn, 'threshold=10') FROM 
testTable";
+
+    // Inner segment
+    Object[] interSegmentsExpectedResults = new Object[6];
+    for (Object operator : Arrays.asList(getOperator(query), 
getOperatorWithFilter(query))) {
+      assertTrue(operator instanceof NonScanBasedAggregationOperator);
+      AggregationResultsBlock resultsBlock = 
((NonScanBasedAggregationOperator) operator).nextBlock();
+      QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) 
operator).getExecutionStatistics(), NUM_RECORDS,
+              0, 0, NUM_RECORDS);
+      List<Object> aggregationResult = resultsBlock.getResults();
+      assertNotNull(aggregationResult);
+      assertEquals(aggregationResult.size(), 6);
+      for (int i = 0; i < 6; i++) {
+        assertTrue(aggregationResult.get(i) instanceof HyperLogLogPlus);
+        HyperLogLogPlus hll = (HyperLogLogPlus) aggregationResult.get(i);
+
+        // Check precision is 14
+        assertEquals(hll.sizeof(), 10924);
+
+        int actualResult = (int) hll.cardinality();
+        int expectedResult = _values.size();
+        // The standard deviation of the error for p 14 is 0.81%, allow 2% 
error
+        assertEquals(actualResult, expectedResult, expectedResult * 0.02);
+
+        interSegmentsExpectedResults[i] = actualResult;
+      }
+    }
+
+    // Inter segments
+    for (BrokerResponseNative brokerResponse : 
Arrays.asList(getBrokerResponse(query),
+            getBrokerResponseWithFilter(query))) {
+      QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * 
NUM_RECORDS, 0, 0, 4 * NUM_RECORDS,
+              interSegmentsExpectedResults);
+    }
+
+    // Regular aggregation
+    query = query + " WHERE intColumn >= 500";
+
+    // Inner segment
+    int expectedResult = 0;
+    for (Integer value : _values) {
+      if (value >= 500) {
+        expectedResult++;
+      }
+    }
+    AggregationOperator aggregationOperator = getOperator(query);
+    List<Object> aggregationResult = 
aggregationOperator.nextBlock().getResults();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 6);
+    for (int i = 0; i < 6; i++) {
+      assertTrue(aggregationResult.get(i) instanceof HyperLogLogPlus);
+      HyperLogLogPlus hll = (HyperLogLogPlus) aggregationResult.get(i);
+
+      // Check precision is 14
+      assertEquals(hll.sizeof(), 10924);
+
+      int actualResult = (int) hll.cardinality();
+      // The standard deviation of the error for p 14 is 0.81%, allow 2% error
+      assertEquals(actualResult, expectedResult, expectedResult * 0.02);
+
+      interSegmentsExpectedResults[i] = actualResult;
+    }
+
+    // Inter segments
+    QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(query), 
interSegmentsExpectedResults);
+
+    // Change precision
+    query = "SELECT DISTINCTCOUNTSMARTHLLPLUS(intColumn, 'threshold=10;p=12') 
FROM testTable";
+    NonScanBasedAggregationOperator nonScanOperator = getOperator(query);
+    aggregationResult = nonScanOperator.nextBlock().getResults();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 1);
+    assertTrue(aggregationResult.get(0) instanceof HyperLogLogPlus);
+    HyperLogLogPlus hll = (HyperLogLogPlus) aggregationResult.get(0);
+    // Check precision is 12
+    assertEquals(hll.sizeof(), 2732);
+  }
+
   @Test
   public void testSmartULL() {
     String query = "SELECT DISTINCTCOUNTSMARTULL(intColumn, 'threshold=10'), "
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 93f6b2755fd..91b8fd9b1b9 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -90,6 +90,8 @@ public enum AggregationFunctionType {
       OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER), i 
-> i == 1), SqlTypeName.OTHER),
   DISTINCTCOUNTSMARTHLL("distinctCountSmartHLL", ReturnTypes.BIGINT,
       OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER), 
i -> i == 1), SqlTypeName.OTHER),
+  DISTINCTCOUNTSMARTHLLPLUS("distinctCountSmartHLLPlus", ReturnTypes.BIGINT,
+      OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER), 
i -> i == 1), SqlTypeName.OTHER),
   @Deprecated FASTHLL("fastHLL"),
   DISTINCTCOUNTHLLPLUS("distinctCountHLLPlus", ReturnTypes.BIGINT,
       OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER), i 
-> i == 1), SqlTypeName.OTHER),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to