This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3501b869c7 Add hyperLogLogPlus aggregation function for distinct count 
(#11346)
3501b869c7 is described below

commit 3501b869c7ece2ddd088daf34c68a15ad6740a3a
Author: deemoliu <qiao...@uber.com>
AuthorDate: Thu Sep 21 16:48:17 2023 -0700

    Add hyperLogLogPlus aggregation function for distinct count (#11346)
    
    * Add hyperLogLogPlus aggregation function for distinct count
    
    * address code comments
    
    * address code comments
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  36 +-
 .../query/NonScanBasedAggregationOperator.java     |  46 +-
 .../pinot/core/plan/AggregationPlanNode.java       |   3 +-
 .../function/AggregationFunctionFactory.java       |   8 +
 .../DistinctCountHLLPlusAggregationFunction.java   | 471 +++++++++++++++++++++
 .../DistinctCountHLLPlusMVAggregationFunction.java | 265 ++++++++++++
 ...DistinctCountRawHLLPlusAggregationFunction.java | 115 +++++
 ...stinctCountRawHLLPlusMVAggregationFunction.java |  36 ++
 .../function/AggregationFunctionFactoryTest.java   |  24 ++
 ...terSegmentAggregationMultiValueQueriesTest.java |  75 ++++
 ...SegmentAggregationMultiValueRawQueriesTest.java |  75 ++++
 ...erSegmentAggregationSingleValueQueriesTest.java |  60 +++
 .../pinot/queries/SerializedBytesQueriesTest.java  | 121 +++++-
 .../tests/OfflineClusterIntegrationTest.java       |  28 +-
 .../DistinctCountHLLPlusValueAggregator.java       | 125 ++++++
 .../local/aggregator/ValueAggregatorFactory.java   |   6 +
 .../local/customobject/SerializedHLLPlus.java      |  42 ++
 .../segment/local/utils/CustomSerDeUtils.java      |  29 ++
 .../segment/local/utils/HyperLogLogPlusUtils.java  |  43 ++
 .../pinot/segment/spi/AggregationFunctionType.java |  16 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 21 files changed, 1613 insertions(+), 13 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 69c00ea5b8..4f05bad761 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.common;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.RegisterSet;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -139,7 +140,9 @@ public class ObjectSerDeUtils {
     KllDataSketch(36),
     IntegerTupleSketch(37),
     FrequentStringsSketch(38),
-    FrequentLongsSketch(39);
+    FrequentLongsSketch(39),
+    HyperLogLogPlus(40);
+
 
     private final int _value;
 
@@ -235,6 +238,8 @@ public class ObjectSerDeUtils {
         return ObjectType.FrequentStringsSketch;
       } else if (value instanceof LongsSketch) {
         return ObjectType.FrequentLongsSketch;
+      } else if (value instanceof HyperLogLogPlus) {
+        return ObjectType.HyperLogLogPlus;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + 
value.getClass().getSimpleName());
       }
@@ -563,6 +568,34 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<HyperLogLogPlus> HYPER_LOG_LOG_PLUS_SER_DE = 
new ObjectSerDe<HyperLogLogPlus>() {
+
+    @Override
+    public byte[] serialize(HyperLogLogPlus hyperLogLogPlus) {
+      try {
+        return hyperLogLogPlus.getBytes();
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while serializing 
HyperLogLogPlus", e);
+      }
+    }
+
+    @Override
+    public HyperLogLogPlus deserialize(byte[] bytes) {
+      try {
+        return HyperLogLogPlus.Builder.build(bytes);
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while serializing 
HyperLogLogPlus", e);
+      }
+    }
+
+    @Override
+    public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      return deserialize(bytes);
+    }
+  };
+
   public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new 
ObjectSerDe<DistinctTable>() {
 
     @Override
@@ -1377,6 +1410,7 @@ public class ObjectSerDeUtils {
       DATA_SKETCH_INT_TUPLE_SER_DE,
       FREQUENT_STRINGS_SKETCH_SER_DE,
       FREQUENT_LONGS_SKETCH_SER_DE,
+      HYPER_LOG_LOG_PLUS_SER_DE,
   };
   //@formatter:on
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index 2060135d38..fb6e8d02f6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.query;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
 import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -36,7 +37,9 @@ import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctCountHLLPlusAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLPlusAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
@@ -50,8 +53,8 @@ import org.apache.pinot.spi.utils.ByteArray;
  * Aggregation operator that utilizes dictionary or column metadata for 
serving aggregation queries to avoid scanning.
  * The scanless operator is selected in the plan maker, if the query is of 
aggregation type min, max, minmaxrange,
  * distinctcount, distinctcounthll, distinctcountrawhll, 
segmentpartitioneddistinctcount, distinctcountsmarthll,
- * and the column has a dictionary, or has column metadata with min and max 
value defined. It also supports count(*) if
- * the query has no filter.
+ * distinctcounthllplus, distinctcountrawhllplus, and the column has a 
dictionary, or has column metadata with min and
+ * max value defined. It also supports count(*) if the query has no filter.
  * We don't use this operator if the segment has star tree,
  * as the dictionary will have aggregated values for the metrics, and 
dimensions will have star node value.
  *
@@ -118,6 +121,17 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
           result = 
getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
               ((DistinctCountRawHLLAggregationFunction) 
aggregationFunction).getDistinctCountHLLAggregationFunction());
           break;
+        case DISTINCTCOUNTHLLPLUS:
+        case DISTINCTCOUNTHLLPLUSMV:
+          result = 
getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
+              (DistinctCountHLLPlusAggregationFunction) aggregationFunction);
+          break;
+        case DISTINCTCOUNTRAWHLLPLUS:
+        case DISTINCTCOUNTRAWHLLPLUSMV:
+          result = 
getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
+              ((DistinctCountRawHLLPlusAggregationFunction) 
aggregationFunction)
+                  .getDistinctCountHLLPlusAggregationFunction());
+          break;
         case SEGMENTPARTITIONEDDISTINCTCOUNT:
           result = (long) 
Objects.requireNonNull(dataSource.getDictionary()).length();
           break;
@@ -215,6 +229,15 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
     return hll;
   }
 
+  private static HyperLogLogPlus getDistinctValueHLLPlus(Dictionary 
dictionary, int p, int sp) {
+    HyperLogLogPlus hllPlus = new HyperLogLogPlus(p, sp);
+    int length = dictionary.length();
+    for (int i = 0; i < length; i++) {
+      hllPlus.offer(dictionary.get(i));
+    }
+    return hllPlus;
+  }
+
   private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
       DistinctCountHLLAggregationFunction function) {
     if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
@@ -234,6 +257,25 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
     }
   }
 
+  private static HyperLogLogPlus getDistinctCountHLLPlusResult(Dictionary 
dictionary,
+      DistinctCountHLLPlusAggregationFunction function) {
+    if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+      // Treat BYTES value as serialized HyperLogLogPlus
+      try {
+        HyperLogLogPlus hllplus = 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(0));
+        int length = dictionary.length();
+        for (int i = 1; i < length; i++) {
+          
hllplus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(i)));
+        }
+        return hllplus;
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging 
HyperLogLogPluses", e);
+      }
+    } else {
+      return getDistinctValueHLLPlus(dictionary, function.getP(), 
function.getSp());
+    }
+  }
+
   private static Object getDistinctCountSmartHLLResult(Dictionary dictionary,
       DistinctCountSmartHLLAggregationFunction function) {
     if (dictionary.length() > function.getThreshold()) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 2f53b0d238..95a61c10be 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -56,7 +56,8 @@ public class AggregationPlanNode implements PlanNode {
   private static final EnumSet<AggregationFunctionType> 
DICTIONARY_BASED_FUNCTIONS =
       EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, 
DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL,
           DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, 
SEGMENTPARTITIONEDDISTINCTCOUNT,
-          DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, 
DISTINCTAVGMV);
+          DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, 
DISTINCTAVGMV, DISTINCTCOUNTHLLPLUS,
+          DISTINCTCOUNTHLLPLUSMV, DISTINCTCOUNTRAWHLLPLUS, 
DISTINCTCOUNTRAWHLLPLUSMV);
 
   // DISTINCTCOUNT excluded because consuming segment metadata contains 
unknown cardinality when there is no dictionary
   private static final EnumSet<AggregationFunctionType> 
METADATA_BASED_FUNCTIONS =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 2d69c093f2..c426ed472f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -298,6 +298,14 @@ public class AggregationFunctionFactory {
             return new DistinctCountHLLMVAggregationFunction(arguments);
           case DISTINCTCOUNTRAWHLLMV:
             return new DistinctCountRawHLLMVAggregationFunction(arguments);
+          case DISTINCTCOUNTHLLPLUS:
+            return new DistinctCountHLLPlusAggregationFunction(arguments);
+          case DISTINCTCOUNTRAWHLLPLUS:
+            return new DistinctCountRawHLLPlusAggregationFunction(arguments);
+          case DISTINCTCOUNTHLLPLUSMV:
+            return new DistinctCountHLLPlusMVAggregationFunction(arguments);
+          case DISTINCTCOUNTRAWHLLPLUSMV:
+            return new DistinctCountRawHLLPlusMVAggregationFunction(arguments);
           case DISTINCTSUMMV:
             return new DistinctSumMVAggregationFunction(arguments);
           case DISTINCTAVGMV:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java
new file mode 100644
index 0000000000..2ca7d4eec3
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class DistinctCountHLLPlusAggregationFunction extends 
BaseSingleInputAggregationFunction<HyperLogLogPlus, Long> {
+  // The parameter "p" determines the precision of the sparse list in 
HyperLogLogPlus.
+  protected final int _p;
+  // The "sp" parameter specifies the number of standard deviations that the 
sparse list's precision should be set to.
+  protected final int _sp;
+
+  public DistinctCountHLLPlusAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments.get(0));
+    int numExpressions = arguments.size();
+    // This function expects 1 or 2 or 3 arguments.
+    Preconditions.checkArgument(numExpressions <= 3, "DistinctCountHLLPlus 
expects 2 or 3 arguments, got: %s",
+        numExpressions);
+    if (arguments.size() == 2) {
+      _p = arguments.get(1).getLiteral().getIntValue();
+      _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+    } else if (arguments.size() == 3) {
+      _p = arguments.get(1).getLiteral().getIntValue();
+      _sp = arguments.get(2).getLiteral().getIntValue();
+    } else {
+      _p = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P;
+      _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+    }
+  }
+
+  public int getP() {
+    return _p;
+  }
+
+  public int getSp() {
+    return _sp;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTHLLPLUS;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized HyperLogLogPlus
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        HyperLogLogPlus hyperLogLogPlus = aggregationResultHolder.getResult();
+        if (hyperLogLogPlus == null) {
+          hyperLogLogPlus = 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[0]);
+          aggregationResultHolder.setValue(hyperLogLogPlus);
+        } else {
+          
hyperLogLogPlus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[0]));
+        }
+        for (int i = 1; i < length; i++) {
+          
hyperLogLogPlus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]));
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging 
HyperLogLogPlus", e);
+      }
+      return;
+    }
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      getDictIdBitmap(aggregationResultHolder, dictionary).addN(dictIds, 0, 
length);
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the 
HyperLogLogPlus
+    HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(aggregationResultHolder);
+    switch (storedType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          hyperLogLogPlus.offer(intValues[i]);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          hyperLogLogPlus.offer(longValues[i]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          hyperLogLogPlus.offer(floatValues[i]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          hyperLogLogPlus.offer(doubleValues[i]);
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          hyperLogLogPlus.offer(stringValues[i]);
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation 
function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized HyperLogLogPlus
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          HyperLogLogPlus value = 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]);
+          int groupKey = groupKeyArray[i];
+          HyperLogLogPlus hyperLogLogPlus = 
groupByResultHolder.getResult(groupKey);
+          if (hyperLogLogPlus != null) {
+            hyperLogLogPlus.addAll(value);
+          } else {
+            groupByResultHolder.setValueForKey(groupKey, value);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging 
HyperLogLogPlus", e);
+      }
+      return;
+    }
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        getDictIdBitmap(groupByResultHolder, groupKeyArray[i], 
dictionary).add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the 
HyperLogLogPlus
+    switch (storedType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          getHyperLogLogPlus(groupByResultHolder, 
groupKeyArray[i]).offer(intValues[i]);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          getHyperLogLogPlus(groupByResultHolder, 
groupKeyArray[i]).offer(longValues[i]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          getHyperLogLogPlus(groupByResultHolder, 
groupKeyArray[i]).offer(floatValues[i]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          getHyperLogLogPlus(groupByResultHolder, 
groupKeyArray[i]).offer(doubleValues[i]);
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          getHyperLogLogPlus(groupByResultHolder, 
groupKeyArray[i]).offer(stringValues[i]);
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation 
function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized HyperLogLogPlus
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          HyperLogLogPlus value = 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]);
+          for (int groupKey : groupKeysArray[i]) {
+            HyperLogLogPlus hyperLogLogPlus = 
groupByResultHolder.getResult(groupKey);
+            if (hyperLogLogPlus != null) {
+              hyperLogLogPlus.addAll(value);
+            } else {
+              // Create a new HyperLogLogPlus for the group
+              groupByResultHolder.setValueForKey(groupKey,
+                  
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]));
+            }
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging 
HyperLogLogPlus", e);
+      }
+      return;
+    }
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        setDictIdForGroupKeys(groupByResultHolder, groupKeysArray[i], 
dictionary, dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the 
HyperLogLogPlus
+    switch (storedType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
intValues[i]);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
longValues[i]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
floatValues[i]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
doubleValues[i]);
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
stringValues[i]);
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation 
function: " + storedType);
+    }
+  }
+
+  @Override
+  public HyperLogLogPlus extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    Object result = aggregationResultHolder.getResult();
+    if (result == null) {
+      return new HyperLogLogPlus(_p, _sp);
+    }
+
+    if (result instanceof DictIdsWrapper) {
+      // For dictionary-encoded expression, convert dictionary ids to 
HyperLogLogPlus
+      return convertToHyperLogLogPlus((DictIdsWrapper) result);
+    } else {
+      // For non-dictionary-encoded expression, directly return the 
HyperLogLogPlus
+      return (HyperLogLogPlus) result;
+    }
+  }
+
+  @Override
+  public HyperLogLogPlus extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    Object result = groupByResultHolder.getResult(groupKey);
+    if (result == null) {
+      return new HyperLogLogPlus(_p, _sp);
+    }
+
+    if (result instanceof DictIdsWrapper) {
+      // For dictionary-encoded expression, convert dictionary ids to 
HyperLogLogPlus
+      return convertToHyperLogLogPlus((DictIdsWrapper) result);
+    } else {
+      // For non-dictionary-encoded expression, directly return the 
HyperLogLogPlus
+      return (HyperLogLogPlus) result;
+    }
+  }
+
+  @Override
+  public HyperLogLogPlus merge(HyperLogLogPlus intermediateResult1, 
HyperLogLogPlus intermediateResult2) {
+    // Can happen when aggregating serialized HyperLogLogPlus with non-default 
p, sp values
+    if (intermediateResult1.sizeof() != intermediateResult2.sizeof()) {
+      if (intermediateResult1.cardinality() == 0) {
+        return intermediateResult2;
+      } else {
+        Preconditions.checkState(intermediateResult2.cardinality() == 0,
+            "Cannot merge HyperLogLogPlus of different sizes");
+        return intermediateResult1;
+      }
+    }
+    try {
+      intermediateResult1.addAll(intermediateResult2);
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while merging 
HyperLogLogPlus", e);
+    }
+    return intermediateResult1;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Long extractFinalResult(HyperLogLogPlus intermediateResult) {
+    return intermediateResult.cardinality();
+  }
+
+  /**
+   * Returns the dictionary id bitmap from the result holder or creates a new 
one if it does not exist.
+   */
+  protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder 
aggregationResultHolder,
+      Dictionary dictionary) {
+    DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
+    if (dictIdsWrapper == null) {
+      dictIdsWrapper = new DictIdsWrapper(dictionary);
+      aggregationResultHolder.setValue(dictIdsWrapper);
+    }
+    return dictIdsWrapper._dictIdBitmap;
+  }
+
+  /**
+   * Returns the HyperLogLogPlus from the result holder or creates a new one 
if it does not exist.
+   */
+  protected HyperLogLogPlus getHyperLogLogPlus(AggregationResultHolder 
aggregationResultHolder) {
+    HyperLogLogPlus hyperLogLogPlus = aggregationResultHolder.getResult();
+    if (hyperLogLogPlus == null) {
+      hyperLogLogPlus = new HyperLogLogPlus(_p, _sp);
+      aggregationResultHolder.setValue(hyperLogLogPlus);
+    }
+    return hyperLogLogPlus;
+  }
+
+  /**
+   * Returns the dictionary id bitmap for the given group key or creates a new 
one if it does not exist.
+   */
+  protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder 
groupByResultHolder, int groupKey,
+      Dictionary dictionary) {
+    DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
+    if (dictIdsWrapper == null) {
+      dictIdsWrapper = new DictIdsWrapper(dictionary);
+      groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper);
+    }
+    return dictIdsWrapper._dictIdBitmap;
+  }
+
+  /**
+   * Returns the HyperLogLogPlus for the given group key or creates a new one 
if it does not exist.
+   */
+  protected HyperLogLogPlus getHyperLogLogPlus(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    HyperLogLogPlus hyperLogLogPlus = groupByResultHolder.getResult(groupKey);
+    if (hyperLogLogPlus == null) {
+      hyperLogLogPlus = new HyperLogLogPlus(_p, _sp);
+      groupByResultHolder.setValueForKey(groupKey, hyperLogLogPlus);
+    }
+    return hyperLogLogPlus;
+  }
+
+  /**
+   * Helper method to set dictionary id for the given group keys into the 
result holder.
+   */
+  private static void setDictIdForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys,
+      Dictionary dictionary, int dictId) {
+    for (int groupKey : groupKeys) {
+      getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
+    }
+  }
+
+  /**
+   * Helper method to set value for the given group keys into the result 
holder.
+   */
+  private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, 
int[] groupKeys, Object value) {
+    for (int groupKey : groupKeys) {
+      getHyperLogLogPlus(groupByResultHolder, groupKey).offer(value);
+    }
+  }
+
+  /**
+   * Helper method to read dictionary and convert dictionary ids to 
HyperLogLogPlus for dictionary-encoded expression.
+   */
+  private HyperLogLogPlus convertToHyperLogLogPlus(DictIdsWrapper 
dictIdsWrapper) {
+    HyperLogLogPlus hyperLogLogPlus = new HyperLogLogPlus(_p, _sp);
+    Dictionary dictionary = dictIdsWrapper._dictionary;
+    RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
+    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+    while (iterator.hasNext()) {
+      hyperLogLogPlus.offer(dictionary.get(iterator.next()));
+    }
+    return hyperLogLogPlus;
+  }
+
+  private static final class DictIdsWrapper {
+    final Dictionary _dictionary;
+    final RoaringBitmap _dictIdBitmap;
+
+    private DictIdsWrapper(Dictionary dictionary) {
+      _dictionary = dictionary;
+      _dictIdBitmap = new RoaringBitmap();
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java
new file mode 100644
index 0000000000..00abb1f5d2
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class DistinctCountHLLPlusMVAggregationFunction extends 
DistinctCountHLLPlusAggregationFunction {
+
+  public DistinctCountHLLPlusMVAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTHLLPLUSMV;
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, 
dictionary);
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        dictIdBitmap.add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the HyperLogLog
+    HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(aggregationResultHolder);
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    switch (storedType) {
+      case INT:
+        int[][] intValuesArray = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int value : intValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      case LONG:
+        long[][] longValuesArray = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (long value : longValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      case FLOAT:
+        float[][] floatValuesArray = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (float value : floatValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      case DOUBLE:
+        double[][] doubleValuesArray = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (double value : doubleValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      case STRING:
+        String[][] stringValuesArray = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (String value : stringValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function: 
" + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        getDictIdBitmap(groupByResultHolder, groupKeyArray[i], 
dictionary).add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the HyperLogLog
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    switch (storedType) {
+      case INT:
+        int[][] intValuesArray = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+          for (int value : intValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      case LONG:
+        long[][] longValuesArray = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+          for (long value : longValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      case FLOAT:
+        float[][] floatValuesArray = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+          for (float value : floatValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      case DOUBLE:
+        double[][] doubleValuesArray = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+          for (double value : doubleValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      case STRING:
+        String[][] stringValuesArray = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+          for (String value : stringValuesArray[i]) {
+            hyperLogLogPlus.offer(value);
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function: 
" + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          getDictIdBitmap(groupByResultHolder, groupKey, 
dictionary).add(dictIds[i]);
+        }
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the HyperLogLog
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    switch (storedType) {
+      case INT:
+        int[][] intValuesArray = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          int[] intValues = intValuesArray[i];
+          for (int groupKey : groupKeysArray[i]) {
+            HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKey);
+            for (int value : intValues) {
+              hyperLogLogPlus.offer(value);
+            }
+          }
+        }
+        break;
+      case LONG:
+        long[][] longValuesArray = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          long[] longValues = longValuesArray[i];
+          for (int groupKey : groupKeysArray[i]) {
+            HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKey);
+            for (long value : longValues) {
+              hyperLogLogPlus.offer(value);
+            }
+          }
+        }
+        break;
+      case FLOAT:
+        float[][] floatValuesArray = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          float[] floatValues = floatValuesArray[i];
+          for (int groupKey : groupKeysArray[i]) {
+            HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKey);
+            for (float value : floatValues) {
+              hyperLogLogPlus.offer(value);
+            }
+          }
+        }
+        break;
+      case DOUBLE:
+        double[][] doubleValuesArray = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          double[] doubleValues = doubleValuesArray[i];
+          for (int groupKey : groupKeysArray[i]) {
+            HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKey);
+            for (double value : doubleValues) {
+              hyperLogLogPlus.offer(value);
+            }
+          }
+        }
+        break;
+      case STRING:
+        String[][] stringValuesArray = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          String[] stringValues = stringValuesArray[i];
+          for (int groupKey : groupKeysArray[i]) {
+            HyperLogLogPlus hyperLogLogPlus = 
getHyperLogLogPlus(groupByResultHolder, groupKey);
+            for (String value : stringValues) {
+              hyperLogLogPlus.offer(value);
+            }
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function: 
" + storedType);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java
new file mode 100644
index 0000000000..facef6a222
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.SerializedHLLPlus;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class DistinctCountRawHLLPlusAggregationFunction
+    extends BaseSingleInputAggregationFunction<HyperLogLogPlus, 
SerializedHLLPlus> {
+  private final DistinctCountHLLPlusAggregationFunction 
_distinctCountHLLPlusAggregationFunction;
+
+  public DistinctCountRawHLLPlusAggregationFunction(List<ExpressionContext> 
arguments) {
+    this(arguments.get(0), new 
DistinctCountHLLPlusAggregationFunction(arguments));
+  }
+
+  DistinctCountRawHLLPlusAggregationFunction(ExpressionContext expression,
+      DistinctCountHLLPlusAggregationFunction 
distinctCountHLLPlusAggregationFunction) {
+    super(expression);
+    _distinctCountHLLPlusAggregationFunction = 
distinctCountHLLPlusAggregationFunction;
+  }
+
+  public DistinctCountHLLPlusAggregationFunction 
getDistinctCountHLLPlusAggregationFunction() {
+    return _distinctCountHLLPlusAggregationFunction;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUS;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return 
_distinctCountHLLPlusAggregationFunction.createAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return 
_distinctCountHLLPlusAggregationFunction.createGroupByResultHolder(initialCapacity,
 maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _distinctCountHLLPlusAggregationFunction.aggregate(length, 
aggregationResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _distinctCountHLLPlusAggregationFunction.aggregateGroupBySV(length, 
groupKeyArray, groupByResultHolder,
+        blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _distinctCountHLLPlusAggregationFunction.aggregateGroupByMV(length, 
groupKeysArray, groupByResultHolder,
+        blockValSetMap);
+  }
+
+  @Override
+  public HyperLogLogPlus extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return 
_distinctCountHLLPlusAggregationFunction.extractAggregationResult(aggregationResultHolder);
+  }
+
+  @Override
+  public HyperLogLogPlus extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    return 
_distinctCountHLLPlusAggregationFunction.extractGroupByResult(groupByResultHolder,
 groupKey);
+  }
+
+  @Override
+  public HyperLogLogPlus merge(HyperLogLogPlus intermediateResult1, 
HyperLogLogPlus intermediateResult2) {
+    return _distinctCountHLLPlusAggregationFunction.merge(intermediateResult1, 
intermediateResult2);
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return 
_distinctCountHLLPlusAggregationFunction.getIntermediateResultColumnType();
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public SerializedHLLPlus extractFinalResult(HyperLogLogPlus 
intermediateResult) {
+    return new SerializedHLLPlus(intermediateResult);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java
new file mode 100644
index 0000000000..6ae2d04996
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class DistinctCountRawHLLPlusMVAggregationFunction extends 
DistinctCountRawHLLPlusAggregationFunction {
+
+  public DistinctCountRawHLLPlusMVAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments.get(0), new 
DistinctCountHLLPlusMVAggregationFunction(arguments));
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUSMV;
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index de8abca960..0caf40536b 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -172,6 +172,18 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.DISTINCTCOUNTRAWHLL);
     assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
 
+    function = getFunction("DiStInCtCoUnThLlPlUs");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, false);
+    assertTrue(aggregationFunction instanceof 
DistinctCountHLLPlusAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.DISTINCTCOUNTHLLPLUS);
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
+    function = getFunction("DiStInCtCoUnTrAwHlLpLuS");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, false);
+    assertTrue(aggregationFunction instanceof 
DistinctCountRawHLLPlusAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUS);
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
     function = getFunction("FaStHlL");
     aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, false);
     assertTrue(aggregationFunction instanceof FastHLLAggregationFunction);
@@ -358,6 +370,18 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.DISTINCTCOUNTRAWHLLMV);
     assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
 
+    function = getFunction("DiStInCt_CoUnT_hLl_PlUs_Mv");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, false);
+    assertTrue(aggregationFunction instanceof 
DistinctCountHLLPlusMVAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.DISTINCTCOUNTHLLPLUSMV);
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
+    function = getFunction("DiStInCtCoUnTrAwHlLpLuS_mV");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, false);
+    assertTrue(aggregationFunction instanceof 
DistinctCountRawHLLPlusMVAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUSMV);
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
     function = getFunction("PeRcEnTiLe10Mv");
     aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, false);
     assertTrue(aggregationFunction instanceof PercentileMVAggregationFunction);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 33c32db70a..f974aa8542 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -408,6 +408,39 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable);
   }
 
+  @Test
+  public void testDistinctCountHLLPlusMV() {
+    String query = "SELECT DISTINCTCOUNTHLLPLUSMV(column6) AS value FROM 
testTable";
+
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    Object[] expectedResults = new Object[]{18651L};
+    ResultTable expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(expectedResults));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L, 
400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResults[0] = 1176L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
62480L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+    expectedResults[0] = 4796L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+    expectedResults[0] = 1176L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+    expectedResults[0] = 3457L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+    expectedResults[0] = 579L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable);
+  }
+
   @Test
   public void testDistinctCountRawHLLMV() {
     String query = "SELECT DISTINCTCOUNTRAWHLLMV(column6) AS value FROM 
testTable";
@@ -449,6 +482,48 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
         cardinalityExtractor);
   }
 
+  @Test
+  public void testDistinctCountRawHLLPLUSMV() {
+    String query = "SELECT DISTINCTCOUNTRAWHLLPLUSMV(column6) AS value FROM 
testTable";
+    Function<Object, Object> cardinalityExtractor =
+        value -> 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String)
 value))
+            .cardinality();
+
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    Object[] expectedResults = new Object[]{18651L};
+    ResultTable expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(expectedResults));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L, 
400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResults[0] = 1176L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
62480L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+    expectedResults[0] = 4796L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+    expectedResults[0] = 1176L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+    expectedResults[0] = 3457L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+    expectedResults[0] = 579L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+  }
+
   @Test
   public void testPercentileMV() {
     List<String> queries = Arrays.asList("SELECT PERCENTILE50MV(column6) AS 
value FROM testTable",
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index c9f0c444af..591b5ffffa 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -350,6 +350,39 @@ public class 
InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
     QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 
124960L, 400000L, expectedResultTable);
   }
 
+  @Test
+  public void testDistinctCountHLLPLUSMV() {
+    String query = "SELECT DISTINCTCOUNTHLLPLUSMV(column6) AS value FROM 
testTable";
+
+    // Without filter, query should be answered by 
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
DataSchema.ColumnDataType[]
+        {DataSchema.ColumnDataType.LONG});
+    Object[] expectedResults = new Object[]{18651L};
+    ResultTable expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(expectedResults));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
400000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResults[0] = 1176L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 
62480L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+    expectedResults[0] = 4796L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+    expectedResults[0] = 1176L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 
124960L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+    expectedResults[0] = 3457L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+    expectedResults[0] = 579L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 
124960L, 400000L, expectedResultTable);
+  }
+
   @Test
   public void testDistinctCountRawHLLMV() {
     String query = "SELECT DISTINCTCOUNTRAWHLLMV(column6) AS value FROM 
testTable";
@@ -391,6 +424,48 @@ public class 
InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
         cardinalityExtractor);
   }
 
+  @Test
+  public void testDistinctCountRawHLLPLUSMV() {
+    String query = "SELECT DISTINCTCOUNTRAWHLLPLUSMV(column6) AS value FROM 
testTable";
+    Function<Object, Object> cardinalityExtractor =
+        value -> 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String)
 value))
+            .cardinality();
+
+    // Without filter, query should be answered by 
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
DataSchema.ColumnDataType[]
+        {DataSchema.ColumnDataType.LONG});
+    Object[] expectedResults = new Object[]{18651L};
+    ResultTable expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(expectedResults));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
400000L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResults[0] = 1176L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 
62480L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+    expectedResults[0] = 4796L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+    expectedResults[0] = 1176L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 
124960L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+    expectedResults[0] = 3457L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+    expectedResults[0] = 579L;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 
124960L, 400000L, expectedResultTable,
+        cardinalityExtractor);
+  }
+
   @Test
   public void testPercentileMV() {
     List<String> queries = Arrays.asList("SELECT PERCENTILE50MV(column6) AS 
value FROM testTable",
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index a6e8320962..d852a8c626 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -283,6 +283,32 @@ public class InterSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 
73548L, 120000L, expectedResultTable);
   }
 
+  @Test
+  public void testDistinctCountHLLPlus() {
+    String query = "SELECT DISTINCTCOUNTHLLPLUS(column1) AS v1, 
DISTINCTCOUNTHLLPLUS(column3) AS v2 FROM testTable";
+
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema =
+        new DataSchema(new String[]{"v1", "v2"}, new 
ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.LONG});
+    ResultTable expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new 
Object[]{6595L, 21822L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 0L, 
120000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(new Object[]{1885L, 4545L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 
49032L, 120000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + GROUP_BY);
+    expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(new Object[]{3495L, 12022L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 
360000L, 120000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+    expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(new Object[]{1273L, 3284L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 
73548L, 120000L, expectedResultTable);
+  }
+
   @Test
   public void testDistinctCountRawHLL() {
     String query = "SELECT DISTINCTCOUNTRAWHLL(column1) AS v1, 
DISTINCTCOUNTRAWHLL(column3) AS v2 FROM testTable";
@@ -315,6 +341,40 @@ public class InterSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
         cardinalityExtractor);
   }
 
+  @Test
+  public void testDistinctCountRawHLLPlus() {
+    String query =
+        "SELECT DISTINCTCOUNTRAWHLLPLUS(column1) AS v1, 
DISTINCTCOUNTRAWHLLPLUS(column3) AS v2 FROM testTable";
+    Function<Object, Object> cardinalityExtractor =
+        value -> 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String)
 value))
+            .cardinality();
+
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema =
+        new DataSchema(new String[]{"v1", "v2"}, new 
ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.LONG});
+    ResultTable expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new 
Object[]{6595L, 21822L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 0L, 
120000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(new Object[]{1885L, 4545L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 
49032L, 120000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + GROUP_BY);
+    expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(new Object[]{3495L, 12022L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 
360000L, 120000L, expectedResultTable,
+        cardinalityExtractor);
+
+    brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+    expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(new Object[]{1273L, 3284L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 
73548L, 120000L, expectedResultTable,
+        cardinalityExtractor);
+  }
+
   @Test
   public void testPercentile() {
     List<String> queries =
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
index bba665f9e8..17bd6494a7 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.queries;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.tdunning.math.stats.MergingDigest;
 import com.tdunning.math.stats.TDigest;
 import java.io.File;
@@ -82,6 +83,8 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
   private static final String DISTINCT_COUNT_HLL_COLUMN = 
"distinctCountHLLColumn";
   // Use non-default log2m
   private static final int DISTINCT_COUNT_HLL_LOG2M = 9;
+  private static final String DISTINCT_COUNT_HLL_PLUS_COLUMN = 
"distinctCountHLLPlusColumn";
+  private static final int DISTINCT_COUNT_HLL_PLUS_P = 14;
   private static final String MIN_MAX_RANGE_COLUMN = "minMaxRangeColumn";
   private static final String PERCENTILE_EST_COLUMN = "percentileEstColumn";
   // Use non-default max error
@@ -101,6 +104,7 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
   private final int[][] _valuesArray = new 
int[NUM_ROWS][MAX_NUM_VALUES_TO_PRE_AGGREGATE];
   private final AvgPair[] _avgPairs = new AvgPair[NUM_ROWS];
   private final HyperLogLog[] _hyperLogLogs = new HyperLogLog[NUM_ROWS];
+  private final HyperLogLogPlus[] _hyperLogLogPluses = new 
HyperLogLogPlus[NUM_ROWS];
   private final MinMaxRangePair[] _minMaxRangePairs = new 
MinMaxRangePair[NUM_ROWS];
   private final QuantileDigest[] _quantileDigests = new 
QuantileDigest[NUM_ROWS];
   private final TDigest[] _tDigests = new TDigest[NUM_ROWS];
@@ -193,6 +197,14 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
       _tDigests[i] = tDigest;
       valueMap.put(PERCENTILE_TDIGEST_COLUMN, 
ObjectSerDeUtils.TDIGEST_SER_DE.serialize(tDigest));
 
+      HyperLogLogPlus hyperLogLogPlus = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+      for (int value : values) {
+        hyperLogLogPlus.offer(value);
+      }
+      _hyperLogLogPluses[i] = hyperLogLogPlus;
+      valueMap.put(DISTINCT_COUNT_HLL_PLUS_COLUMN,
+          
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus));
+
       GenericRow genericRow = new GenericRow();
       genericRow.init(valueMap);
       rows.add(genericRow);
@@ -201,8 +213,9 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
     Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
         .addSingleValueDimension(GROUP_BY_SV_COLUMN, DataType.STRING)
         .addMultiValueDimension(GROUP_BY_MV_COLUMN, 
DataType.STRING).addMetric(AVG_COLUMN, DataType.BYTES)
-        .addMetric(DISTINCT_COUNT_HLL_COLUMN, 
DataType.BYTES).addMetric(MIN_MAX_RANGE_COLUMN, DataType.BYTES)
-        .addMetric(PERCENTILE_EST_COLUMN, 
DataType.BYTES).addMetric(PERCENTILE_TDIGEST_COLUMN, DataType.BYTES).build();
+        .addMetric(DISTINCT_COUNT_HLL_COLUMN, 
DataType.BYTES).addMetric(DISTINCT_COUNT_HLL_PLUS_COLUMN, DataType.BYTES)
+        .addMetric(MIN_MAX_RANGE_COLUMN, 
DataType.BYTES).addMetric(PERCENTILE_EST_COLUMN, DataType.BYTES)
+        .addMetric(PERCENTILE_TDIGEST_COLUMN, DataType.BYTES).build();
 
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
 
@@ -224,7 +237,7 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
     AggregationOperator aggregationOperator = 
getOperator(getAggregationQuery());
     List<Object> aggregationResult = 
aggregationOperator.nextBlock().getResults();
     assertNotNull(aggregationResult);
-    assertEquals(aggregationResult.size(), 5);
+    assertEquals(aggregationResult.size(), 6);
 
     // Avg
     AvgPair avgPair = (AvgPair) aggregationResult.get(0);
@@ -277,13 +290,24 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
       expectedTDigest.add(_tDigests[i]);
     }
     assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5), 
PERCENTILE_TDIGEST_DELTA);
+
+    // DistinctCountHLLPlus
+    HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus) 
aggregationResult.get(5);
+    HyperLogLogPlus expectedHyperLogLogPlus = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+    for (int value : _valuesArray[0]) {
+      expectedHyperLogLogPlus.offer(value);
+    }
+    for (int i = 1; i < NUM_ROWS; i++) {
+      expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]);
+    }
+    assertEquals(hyperLogLogPlus.cardinality(), 
expectedHyperLogLogPlus.cardinality());
   }
 
   @Test
   public void testInterSegmentsAggregation()
       throws Exception {
     Object[] aggregationResults = 
getBrokerResponse(getAggregationQuery()).getResultTable().getRows().get(0);
-    assertEquals(aggregationResults.length, 5);
+    assertEquals(aggregationResults.length, 6);
 
     // Simulate the process of server side merge and broker side merge
 
@@ -372,11 +396,31 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
     tDigest1.add(tDigest2);
     double expectedPercentileTDigestResult = tDigest1.quantile(0.5);
 
+    // DistinctCountHLLPlus
+    HyperLogLogPlus hyperLogLogPlus1 = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+    HyperLogLogPlus hyperLogLogPlus2 = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+    for (int value : _valuesArray[0]) {
+      hyperLogLogPlus1.offer(value);
+      hyperLogLogPlus2.offer(value);
+    }
+    for (int i = 1; i < NUM_ROWS; i++) {
+      hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]);
+      hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]);
+    }
+    hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+    hyperLogLogPlus1 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+        
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+    hyperLogLogPlus2 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+        
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+    hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+    long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality();
+
     assertEquals((Double) aggregationResults[0], expectedAvgResult, 1e-5);
     assertEquals((long) aggregationResults[1], expectedDistinctCountHllResult);
     assertEquals((Double) aggregationResults[2], expectedMinMaxRangeResult, 
1e-5);
     assertEquals((long) aggregationResults[3], expectedPercentileEstResult);
     assertEquals((Double) aggregationResults[4], 
expectedPercentileTDigestResult, PERCENTILE_TDIGEST_DELTA);
+    assertEquals((long) aggregationResults[5], 
expectedDistinctCountHllPlusResult);
   }
 
   @Test
@@ -442,6 +486,17 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
         expectedTDigest.add(_tDigests[i]);
       }
       assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5), 
PERCENTILE_TDIGEST_DELTA);
+
+      // DistinctCountHLLPlus
+      HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus) 
groupByResult.getResultForGroupId(5, groupKey._groupId);
+      HyperLogLogPlus expectedHyperLogLogPlus = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+      for (int value : _valuesArray[groupId]) {
+        expectedHyperLogLogPlus.offer(value);
+      }
+      for (int i = groupId + NUM_GROUPS; i < NUM_ROWS; i += NUM_GROUPS) {
+        expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]);
+      }
+      assertEquals(hyperLogLogPlus.cardinality(), 
expectedHyperLogLogPlus.cardinality());
     }
   }
 
@@ -539,12 +594,32 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
       tDigest1.add(tDigest2);
       double expectedPercentileTDigestResult = tDigest1.quantile(0.5);
 
+      // DistinctCountHLLPlus
+      HyperLogLogPlus hyperLogLogPlus1 = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+      HyperLogLogPlus hyperLogLogPlus2 = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+      for (int value : _valuesArray[groupId]) {
+        hyperLogLogPlus1.offer(value);
+        hyperLogLogPlus2.offer(value);
+      }
+      for (int i = groupId + NUM_GROUPS; i < NUM_ROWS; i += NUM_GROUPS) {
+        hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]);
+        hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]);
+      }
+      hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+      hyperLogLogPlus1 = 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+          
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+      hyperLogLogPlus2 = 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+          
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+      hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+      long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality();
+
       Object[] row = rows.get(groupId);
       assertEquals((Double) row[0], expectedAvgResult, 1e-5);
       assertEquals((long) row[1], expectedDistinctCountHllResult);
       assertEquals((Double) row[2], expectedMinMaxRangeResult, 1e-5);
       assertEquals((long) row[3], expectedPercentileEstResult);
       assertEquals((Double) row[4], expectedPercentileTDigestResult, 
PERCENTILE_TDIGEST_DELTA);
+      assertEquals((long) row[5], expectedDistinctCountHllPlusResult);
     }
   }
 
@@ -595,6 +670,15 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
       expectedTDigest.add(_tDigests[i]);
     }
 
+    // DistinctCountHLL
+    HyperLogLogPlus expectedHyperLogLogPlus = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+    for (int value : _valuesArray[0]) {
+      expectedHyperLogLogPlus.offer(value);
+    }
+    for (int i = 1; i < NUM_ROWS; i++) {
+      expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]);
+    }
+
     Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = 
groupByResult.getGroupKeyIterator();
     while (groupKeyIterator.hasNext()) {
       GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
@@ -620,6 +704,10 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
       // PercentileTDigest
       TDigest tDigest = (TDigest) groupByResult.getResultForGroupId(4, 
groupKey._groupId);
       assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5), 
PERCENTILE_TDIGEST_DELTA);
+
+      // DistinctCountHLLPlus
+      HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus) 
groupByResult.getResultForGroupId(5, groupKey._groupId);
+      assertEquals(hyperLogLogPlus.cardinality(), 
expectedHyperLogLogPlus.cardinality());
     }
   }
 
@@ -716,20 +804,41 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
     tDigest1.add(tDigest2);
     double expectedPercentileTDigestResult = tDigest1.quantile(0.5);
 
+    // DistinctCountHLL
+    HyperLogLogPlus hyperLogLogPlus1 = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+    HyperLogLogPlus hyperLogLogPlus2 = new 
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+    for (int value : _valuesArray[0]) {
+      hyperLogLogPlus1.offer(value);
+      hyperLogLogPlus2.offer(value);
+    }
+    for (int i = 1; i < NUM_ROWS; i++) {
+      hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]);
+      hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]);
+    }
+    hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+    hyperLogLogPlus1 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+        
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+    hyperLogLogPlus2 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+        
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+    hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+    long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality();
+
     for (Object[] row : rows) {
       assertEquals((Double) row[0], expectedAvgResult, 1e-5);
       assertEquals((long) row[1], expectedDistinctCountHllResult);
       assertEquals((Double) row[2], expectedMinMaxRangeResult, 1e-5);
       assertEquals((long) row[3], expectedPercentileEstResult);
       assertEquals((Double) row[4], expectedPercentileTDigestResult, 
PERCENTILE_TDIGEST_DELTA);
+      assertEquals((long) row[5], expectedDistinctCountHllPlusResult);
     }
   }
 
   private String getAggregationQuery() {
     return String.format(
-        "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s), 
PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s) FROM %s",
+        "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s), 
PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s), "
+            + "DISTINCTCOUNTHLLPLUS(%s) FROM %s",
         AVG_COLUMN, DISTINCT_COUNT_HLL_COLUMN, MIN_MAX_RANGE_COLUMN, 
PERCENTILE_EST_COLUMN, PERCENTILE_TDIGEST_COLUMN,
-        RAW_TABLE_NAME);
+        DISTINCT_COUNT_HLL_PLUS_COLUMN, RAW_TABLE_NAME);
   }
 
   private String getGroupBySVQuery() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index df045e1d5b..d9d645c065 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2632,7 +2632,6 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // The Accurate value is 6538.
     query = "SELECT distinctCount(FlightNum) FROM mytable ";
     
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 6538);
-    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 6538);
 
     // Expected distinctCountHll with different log2m value from 2 to 19. The 
Accurate value is 6538.
     long[] expectedResults = new long[]{
@@ -2642,7 +2641,6 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     for (int i = 2; i < 20; i++) {
       query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM 
mytable ", i);
       
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[i - 2]);
-      
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[i - 2]);
     }
 
     // Default log2m for HLL is set to 12 in V1 and 8 in V2
@@ -2654,7 +2652,31 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       expectedDefault = expectedResults[10];
     }
     
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedDefault);
-    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedDefault);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctCountHllPlus(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query;
+
+    // The Accurate value is 6538.
+    query = "SELECT distinctCount(FlightNum) FROM mytable ";
+    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 6538);
+
+    // Expected distinctCountHllPlus with different P value from 4 (minimal 
value) to 19. The Accurate value is 6538.
+    long[] expectedResults = new long[]{
+        4901, 5755, 6207, 5651, 6318, 6671, 6559, 6425, 6490, 6486, 6489, 
6516, 6532, 6526, 6525, 6534
+    };
+
+    for (int i = 4; i < 20; i++) {
+      query = String.format("SELECT distinctCountHLLPlus(FlightNum, %d) FROM 
mytable ", i);
+      
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[i - 4]);
+    }
+
+    // Default HLL Plus is set as p=14
+    query = "SELECT distinctCountHLLPlus(FlightNum) FROM mytable ";
+    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[10]);
   }
 
   @Test
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
new file mode 100644
index 0000000000..66e38cd151
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.local.utils.HyperLogLogPlusUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class DistinctCountHLLPlusValueAggregator implements 
ValueAggregator<Object, HyperLogLogPlus> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+  private final int _p;
+  private final int _sp;
+
+  // Byte size won't change once we get the initial aggregated value
+  private int _maxByteSize;
+
+  public DistinctCountHLLPlusValueAggregator(List<ExpressionContext> 
arguments) {
+    // length 1 means we use the default _p and _sp
+    if (arguments.size() == 2) {
+      _p = arguments.get(1).getLiteral().getIntValue();
+      _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+    } else if (arguments.size() == 3) {
+      _p = arguments.get(1).getLiteral().getIntValue();
+      _sp = arguments.get(2).getLiteral().getIntValue();
+    } else {
+      _p = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P;
+      _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTHLL;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  @Override
+  public HyperLogLogPlus getInitialAggregatedValue(Object rawValue) {
+    HyperLogLogPlus initialValue;
+    if (rawValue instanceof byte[]) {
+      byte[] bytes = (byte[]) rawValue;
+      initialValue = deserializeAggregatedValue(bytes);
+      _maxByteSize = bytes.length;
+    } else {
+      initialValue = new HyperLogLogPlus(_p, _sp);
+      initialValue.offer(rawValue);
+      _maxByteSize = HyperLogLogPlusUtils.byteSize(_p, _sp);
+    }
+    return initialValue;
+  }
+
+  @Override
+  public HyperLogLogPlus applyRawValue(HyperLogLogPlus value, Object rawValue) 
{
+    if (rawValue instanceof byte[]) {
+      try {
+        value.addAll(deserializeAggregatedValue((byte[]) rawValue));
+      } catch (CardinalityMergeException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      value.offer(rawValue);
+    }
+    return value;
+  }
+
+  @Override
+  public HyperLogLogPlus applyAggregatedValue(HyperLogLogPlus value, 
HyperLogLogPlus aggregatedValue) {
+    try {
+      value.addAll(aggregatedValue);
+      return value;
+    } catch (CardinalityMergeException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public HyperLogLogPlus cloneAggregatedValue(HyperLogLogPlus value) {
+    return deserializeAggregatedValue(serializeAggregatedValue(value));
+  }
+
+  @Override
+  public int getMaxAggregatedValueByteSize() {
+    // NOTE: For aggregated metrics, initial aggregated value might have not 
been generated. Returns the byte size
+    //       based on p and sp.
+    return _maxByteSize > 0 ? _maxByteSize : HyperLogLogPlusUtils.byteSize(_p, 
_sp);
+  }
+
+  @Override
+  public byte[] serializeAggregatedValue(HyperLogLogPlus value) {
+    return CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(value);
+  }
+
+  @Override
+  public HyperLogLogPlus deserializeAggregatedValue(byte[] bytes) {
+    return CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytes);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index b348b1ff4c..16dc0328f7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -70,6 +70,9 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return new DistinctCountThetaSketchValueAggregator();
+      case DISTINCTCOUNTHLLPLUS:
+      case DISTINCTCOUNTRAWHLLPLUS:
+        return new DistinctCountHLLPlusValueAggregator(arguments);
       case DISTINCTCOUNTTUPLESKETCH:
       case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
       case AVGVALUEINTEGERSUMTUPLESKETCH:
@@ -116,6 +119,9 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return DistinctCountThetaSketchValueAggregator.AGGREGATED_VALUE_TYPE;
+      case DISTINCTCOUNTHLLPLUS:
+      case DISTINCTCOUNTRAWHLLPLUS:
+        return DistinctCountHLLPlusValueAggregator.AGGREGATED_VALUE_TYPE;
       case DISTINCTCOUNTTUPLESKETCH:
       case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
       case AVGVALUEINTEGERSUMTUPLESKETCH:
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java
new file mode 100644
index 0000000000..bee3e7884e
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+
+public class SerializedHLLPlus implements Comparable<SerializedHLLPlus> {
+  private final HyperLogLogPlus _hyperLogLogPlus;
+
+  public SerializedHLLPlus(HyperLogLogPlus hyperLogLogPlus) {
+    _hyperLogLogPlus = hyperLogLogPlus;
+  }
+
+  @Override
+  public int compareTo(SerializedHLLPlus other) {
+    return Long.compare(_hyperLogLogPlus.cardinality(), 
other._hyperLogLogPlus.cardinality());
+  }
+
+  @Override
+  public String toString() {
+    return 
BytesUtils.toHexString(CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(_hyperLogLogPlus));
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index 1ed3a3e341..f5a6275a3b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.utils;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.google.common.primitives.Longs;
 import com.tdunning.math.stats.MergingDigest;
 import com.tdunning.math.stats.TDigest;
@@ -188,6 +189,34 @@ public class CustomSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<HyperLogLogPlus> HYPER_LOG_LOG_PLUS_SER_DE = 
new ObjectSerDe<HyperLogLogPlus>() {
+
+    @Override
+    public byte[] serialize(HyperLogLogPlus hyperLogLogPlus) {
+      try {
+        return hyperLogLogPlus.getBytes();
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while serializing 
HyperLogLogPlus", e);
+      }
+    }
+
+    @Override
+    public HyperLogLogPlus deserialize(byte[] bytes) {
+      try {
+        return HyperLogLogPlus.Builder.build(bytes);
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while de-serializing 
HyperLogLogPlus", e);
+      }
+    }
+
+    @Override
+    public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      return deserialize(bytes);
+    }
+  };
+
   public static final ObjectSerDe<TDigest> TDIGEST_SER_DE = new 
ObjectSerDe<TDigest>() {
 
     @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java
new file mode 100644
index 0000000000..8614961511
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
+
+public class HyperLogLogPlusUtils {
+  private HyperLogLogPlusUtils() {
+  }
+
+  /**
+   * Returns the byte size of the given HyperLogLog.
+   */
+  public static int byteSize(HyperLogLogPlus value) {
+    // 8 bytes header (p, sp, & register set size) & register set data
+    return value.sizeof() + 3 * Integer.BYTES;
+  }
+
+  /**
+   * Returns the byte size of HyperLogLogPlus of a given p and sp.
+   */
+  public static int byteSize(int p, int sp) {
+    // 8 bytes header (p & sp) & register set data
+    return new HyperLogLogPlus(p, sp).sizeof() + 3 * Integer.BYTES;
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 4ac3b32af9..33313366a6 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -141,6 +141,15 @@ public enum AggregationFunctionType {
   PERCENTILERAWKLL("percentileRawKLL", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, 
SqlTypeFamily.NUMERIC)), ReturnTypes.VARCHAR_2000,
       ReturnTypes.explicit(SqlTypeName.OTHER)),
+  // hyper log log plus plus functions
+  DISTINCTCOUNTHLLPLUS("distinctCountHLLPlus", 
ImmutableList.of("DISTINCT_COUNT_HLL_PLUS"), SqlKind.OTHER_FUNCTION,
+  SqlFunctionCategory.USER_DEFINED_FUNCTION,
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.NUMERIC), ordinal -> ordinal > 0),
+  ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
+  DISTINCTCOUNTRAWHLLPLUS("distinctCountRawHLLPlus", 
ImmutableList.of("DISTINCT_COUNT_RAW_HLL_PLUS"),
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.INTEGER), ordinal -> ordinal > 0),
+  ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)),
 
   // DEPRECATED in v2
   @Deprecated
@@ -248,6 +257,13 @@ public enum AggregationFunctionType {
   PERCENTILERAWKLLMV("percentileRawKLLMV", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.ARRAY, 
SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC),
           ordinal -> ordinal > 1 && ordinal < 4), ReturnTypes.VARCHAR_2000, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
+  // hyper log log plus plus functions
+  DISTINCTCOUNTHLLPLUSMV("distinctCountHLLPlusMV", null, 
SqlKind.OTHER_FUNCTION,
+      SqlFunctionCategory.USER_DEFINED_FUNCTION, 
OperandTypes.family(SqlTypeFamily.ARRAY), ReturnTypes.BIGINT,
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
+  DISTINCTCOUNTRAWHLLPLUSMV("distinctCountRawHLLPlusMV", null, 
SqlKind.OTHER_FUNCTION,
+      SqlFunctionCategory.USER_DEFINED_FUNCTION, 
OperandTypes.family(SqlTypeFamily.ARRAY), ReturnTypes.VARCHAR_2000,
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
 
   // boolean aggregate functions
   BOOLAND("boolAnd", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 91755df8fa..713bbb2fe3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -92,6 +92,8 @@ public class CommonConstants {
 
     public static final String DEFAULT_HYPERLOGLOG_LOG2M_KEY = 
"default.hyperloglog.log2m";
     public static final int DEFAULT_HYPERLOGLOG_LOG2M = 8;
+    public static final int DEFAULT_HYPERLOGLOG_PLUS_P = 14;
+    public static final int DEFAULT_HYPERLOGLOG_PLUS_SP = 0;
 
     // 2 to the power of 16, for tradeoffs see datasketches library 
documentation:
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to