This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e76b100b9 Support for DistinctSumMV and DistinctAvgMV aggregation 
functions (#10128)
5e76b100b9 is described below

commit 5e76b100b924340b1f401994f3cd84c876377291
Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com>
AuthorDate: Wed Jan 18 17:09:25 2023 -0800

    Support for DistinctSumMV and DistinctAvgMV aggregation functions (#10128)
    
    * Add support for DistinctSumMV and DistinctAverageMV aggregation functions
    
    * Empty commit to retrigger tests
    
    * Address review comments
---
 .../query/NonScanBasedAggregationOperator.java     |  10 +-
 .../pinot/core/plan/AggregationPlanNode.java       |   2 +-
 .../function/AggregationFunctionFactory.java       |   4 +
 .../BaseDistinctAggregateAggregationFunction.java  | 370 +++++++++++++++++----
 .../function/DistinctAvgAggregationFunction.java   |  25 +-
 ....java => DistinctAvgMVAggregationFunction.java} |  31 +-
 .../function/DistinctCountAggregationFunction.java |  25 +-
 .../DistinctCountMVAggregationFunction.java        | 244 +-------------
 .../function/DistinctSumAggregationFunction.java   |  25 +-
 ....java => DistinctSumMVAggregationFunction.java} |  31 +-
 .../pinot/queries/ExplainPlanQueriesTest.java      | 126 ++++---
 ...terSegmentAggregationMultiValueQueriesTest.java |  84 ++++-
 .../pinot/segment/spi/AggregationFunctionType.java |   2 +
 13 files changed, 625 insertions(+), 354 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index e036e17a8d..bf7e4e63c2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -100,7 +100,11 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
           result = new MinMaxRangePair(getMinValue(dataSource), 
getMaxValue(dataSource));
           break;
         case DISTINCTCOUNT:
+        case DISTINCTSUM:
+        case DISTINCTAVG:
         case DISTINCTCOUNTMV:
+        case DISTINCTSUMMV:
+        case DISTINCTAVGMV:
           result = 
getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary()));
           break;
         case DISTINCTCOUNTHLL:
@@ -113,12 +117,6 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
           result = 
getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
               ((DistinctCountRawHLLAggregationFunction) 
aggregationFunction).getDistinctCountHLLAggregationFunction());
           break;
-        case DISTINCTSUM:
-          result = 
getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary()));
-          break;
-        case DISTINCTAVG:
-          result = 
getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary()));
-          break;
         case SEGMENTPARTITIONEDDISTINCTCOUNT:
           result = (long) 
Objects.requireNonNull(dataSource.getDictionary()).length();
           break;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 8ab98970f4..74e5951412 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -56,7 +56,7 @@ public class AggregationPlanNode implements PlanNode {
   private static final EnumSet<AggregationFunctionType> 
DICTIONARY_BASED_FUNCTIONS =
       EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, 
DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL,
           DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, 
SEGMENTPARTITIONEDDISTINCTCOUNT,
-          DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG);
+          DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, 
DISTINCTAVGMV);
 
   // DISTINCTCOUNT excluded because consuming segment metadata contains 
unknown cardinality when there is no dictionary
   private static final EnumSet<AggregationFunctionType> 
METADATA_BASED_FUNCTIONS =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 157d1b64da..9b571ff3c4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -266,6 +266,10 @@ public class AggregationFunctionFactory {
             return new DistinctCountHLLMVAggregationFunction(arguments);
           case DISTINCTCOUNTRAWHLLMV:
             return new DistinctCountRawHLLMVAggregationFunction(arguments);
+          case DISTINCTSUMMV:
+            return new DistinctSumMVAggregationFunction(firstArgument);
+          case DISTINCTAVGMV:
+            return new DistinctAvgMVAggregationFunction(firstArgument);
           case DISTINCT:
             return new DistinctAggregationFunction(arguments, 
queryContext.getOrderByExpressions(),
                 queryContext.getLimit());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java
index 0f9f09a392..0dc2ed35e5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java
@@ -71,7 +71,73 @@ public abstract class 
BaseDistinctAggregateAggregationFunction<T extends Compara
   }
 
   @Override
-  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+  public Set extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    Object result = aggregationResultHolder.getResult();
+    if (result == null) {
+      // Use empty IntOpenHashSet as a place holder for empty result
+      return new IntOpenHashSet();
+    }
+
+    if (result instanceof DictIdsWrapper) {
+      // For dictionary-encoded expression, convert dictionary ids to values
+      return convertToValueSet((DictIdsWrapper) result);
+    } else {
+      // For non-dictionary-encoded expression, directly return the value set
+      return (Set) result;
+    }
+  }
+
+  @Override
+  public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey) {
+    Object result = groupByResultHolder.getResult(groupKey);
+    if (result == null) {
+      // NOTE: Return an empty IntOpenHashSet for empty result.
+      return new IntOpenHashSet();
+    }
+
+    if (result instanceof DictIdsWrapper) {
+      // For dictionary-encoded expression, convert dictionary ids to values
+      return convertToValueSet((DictIdsWrapper) result);
+    } else {
+      // For non-dictionary-encoded expression, directly return the value set
+      return (Set) result;
+    }
+  }
+
+  @Override
+  public Set merge(Set intermediateResult1, Set intermediateResult2) {
+    if (intermediateResult1.isEmpty()) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2.isEmpty()) {
+      return intermediateResult1;
+    }
+    intermediateResult1.addAll(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  /**
+   * Returns the dictionary id bitmap from the result holder or creates a new 
one if it does not exist.
+   */
+  protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder 
aggregationResultHolder,
+      Dictionary dictionary) {
+    DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
+    if (dictIdsWrapper == null) {
+      dictIdsWrapper = new DictIdsWrapper(dictionary);
+      aggregationResultHolder.setValue(dictIdsWrapper);
+    }
+    return dictIdsWrapper._dictIdBitmap;
+  }
+
+  /**
+   * Performs aggregation for a SV column
+   */
+  protected void svAggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
 
@@ -136,8 +202,85 @@ public abstract class 
BaseDistinctAggregateAggregationFunction<T extends Compara
     }
   }
 
-  @Override
-  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+  /**
+   * Performs aggregation for a MV column
+   */
+  protected void mvAggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, 
dictionary);
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        dictIdBitmap.add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the value set
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    Set valueSet = getValueSet(aggregationResultHolder, storedType);
+    switch (storedType) {
+      case INT:
+        IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
+        int[][] intValues = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int value : intValues[i]) {
+            intSet.add(value);
+          }
+        }
+        break;
+      case LONG:
+        LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
+        long[][] longValues = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (long value : longValues[i]) {
+            longSet.add(value);
+          }
+        }
+        break;
+      case FLOAT:
+        FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
+        float[][] floatValues = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (float value : floatValues[i]) {
+            floatSet.add(value);
+          }
+        }
+        break;
+      case DOUBLE:
+        DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
+        double[][] doubleValues = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (double value : doubleValues[i]) {
+            doubleSet.add(value);
+          }
+        }
+        break;
+      case STRING:
+        ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>) 
valueSet;
+        String[][] stringValues = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          //noinspection ManualArrayToCollectionCopy
+          for (String value : stringValues[i]) {
+            //noinspection UseBulkOperation
+            stringSet.add(value);
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for " + _functionType.getName() + " aggregation 
function: " + storedType);
+    }
+  }
+
+  /**
+   * Performs aggregation for a SV column with group by on a SV column.
+   */
+  protected void svAggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
 
@@ -199,8 +342,86 @@ public abstract class 
BaseDistinctAggregateAggregationFunction<T extends Compara
     }
   }
 
-  @Override
-  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+  /**
+   * Performs aggregation for a MV column with group by on a SV column.
+   */
+  protected void mvAggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        getDictIdBitmap(groupByResultHolder, groupKeyArray[i], 
dictionary).add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the value set
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    switch (storedType) {
+      case INT:
+        int[][] intValues = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          IntOpenHashSet intSet = (IntOpenHashSet) 
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT);
+          for (int value : intValues[i]) {
+            intSet.add(value);
+          }
+        }
+        break;
+      case LONG:
+        long[][] longValues = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          LongOpenHashSet longSet = (LongOpenHashSet) 
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.LONG);
+          for (long value : longValues[i]) {
+            longSet.add(value);
+          }
+        }
+        break;
+      case FLOAT:
+        float[][] floatValues = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          FloatOpenHashSet floatSet =
+              (FloatOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.FLOAT);
+          for (float value : floatValues[i]) {
+            floatSet.add(value);
+          }
+        }
+        break;
+      case DOUBLE:
+        double[][] doubleValues = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          DoubleOpenHashSet doubleSet =
+              (DoubleOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.DOUBLE);
+          for (double value : doubleValues[i]) {
+            doubleSet.add(value);
+          }
+        }
+        break;
+      case STRING:
+        String[][] stringValues = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          ObjectOpenHashSet<String> stringSet =
+              (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.STRING);
+          //noinspection ManualArrayToCollectionCopy
+          for (String value : stringValues[i]) {
+            //noinspection UseBulkOperation
+            stringSet.add(value);
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for " + _functionType.getName() + " aggregation 
function: " + storedType);
+    }
+  }
+
+  /**
+   * Performs aggregation for a SV column with group by on a MV column.
+   */
+  protected void svAggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
 
@@ -259,68 +480,91 @@ public abstract class 
BaseDistinctAggregateAggregationFunction<T extends Compara
     }
   }
 
-  @Override
-  public Set extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
-    Object result = aggregationResultHolder.getResult();
-    if (result == null) {
-      // Use empty IntOpenHashSet as a place holder for empty result
-      return new IntOpenHashSet();
-    }
-
-    if (result instanceof DictIdsWrapper) {
-      // For dictionary-encoded expression, convert dictionary ids to values
-      return convertToValueSet((DictIdsWrapper) result);
-    } else {
-      // For non-dictionary-encoded expression, directly return the value set
-      return (Set) result;
-    }
-  }
-
-  @Override
-  public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey) {
-    Object result = groupByResultHolder.getResult(groupKey);
-    if (result == null) {
-      // NOTE: Return an empty IntOpenHashSet for empty result.
-      return new IntOpenHashSet();
-    }
-
-    if (result instanceof DictIdsWrapper) {
-      // For dictionary-encoded expression, convert dictionary ids to values
-      return convertToValueSet((DictIdsWrapper) result);
-    } else {
-      // For non-dictionary-encoded expression, directly return the value set
-      return (Set) result;
-    }
-  }
+  /**
+   * Performs aggregation for a MV column with group by on a MV column.
+   */
+  protected void mvAggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
 
-  @Override
-  public Set merge(Set intermediateResult1, Set intermediateResult2) {
-    if (intermediateResult1.isEmpty()) {
-      return intermediateResult2;
-    }
-    if (intermediateResult2.isEmpty()) {
-      return intermediateResult1;
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          getDictIdBitmap(groupByResultHolder, groupKey, 
dictionary).add(dictIds[i]);
+        }
+      }
+      return;
     }
-    intermediateResult1.addAll(intermediateResult2);
-    return intermediateResult1;
-  }
 
-  @Override
-  public ColumnDataType getIntermediateResultColumnType() {
-    return ColumnDataType.OBJECT;
-  }
-
-  /**
-   * Returns the dictionary id bitmap from the result holder or creates a new 
one if it does not exist.
-   */
-  protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder 
aggregationResultHolder,
-      Dictionary dictionary) {
-    DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
-    if (dictIdsWrapper == null) {
-      dictIdsWrapper = new DictIdsWrapper(dictionary);
-      aggregationResultHolder.setValue(dictIdsWrapper);
+    // For non-dictionary-encoded expression, store hash code of the values 
into the value set
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    switch (storedType) {
+      case INT:
+        int[][] intValues = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            IntOpenHashSet intSet = (IntOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.INT);
+            for (int value : intValues[i]) {
+              intSet.add(value);
+            }
+          }
+        }
+        break;
+      case LONG:
+        long[][] longValues = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            LongOpenHashSet longSet = (LongOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.LONG);
+            for (long value : longValues[i]) {
+              longSet.add(value);
+            }
+          }
+        }
+        break;
+      case FLOAT:
+        float[][] floatValues = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            FloatOpenHashSet floatSet = (FloatOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.FLOAT);
+            for (float value : floatValues[i]) {
+              floatSet.add(value);
+            }
+          }
+        }
+        break;
+      case DOUBLE:
+        double[][] doubleValues = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            DoubleOpenHashSet doubleSet =
+                (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.DOUBLE);
+            for (double value : doubleValues[i]) {
+              doubleSet.add(value);
+            }
+          }
+        }
+        break;
+      case STRING:
+        String[][] stringValues = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            ObjectOpenHashSet<String> stringSet =
+                (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKey, DataType.STRING);
+            //noinspection ManualArrayToCollectionCopy
+            for (String value : stringValues[i]) {
+              //noinspection UseBulkOperation
+              stringSet.add(value);
+            }
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for " + _functionType.getName() + " aggregation 
function: " + storedType);
     }
-    return dictIdsWrapper._dictIdBitmap;
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
index 41b8550963..bca958957f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
+
 /**
- * Aggregation function to compute the average of distinct values.
+ * Aggregation function to compute the average of distinct values for an SV 
column.
  */
 public class DistinctAvgAggregationFunction extends 
BaseDistinctAggregateAggregationFunction<Double> {
 
@@ -32,6 +37,24 @@ public class DistinctAvgAggregationFunction extends 
BaseDistinctAggregateAggrega
     super(expression, AggregationFunctionType.DISTINCTAVG);
   }
 
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregate(length, aggregationResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
+  }
+
   @Override
   public DataSchema.ColumnDataType getFinalResultColumnType() {
     return DataSchema.ColumnDataType.DOUBLE;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java
similarity index 55%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java
index 41b8550963..30a8e00492 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java
@@ -18,18 +18,41 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
+
 /**
- * Aggregation function to compute the average of distinct values.
+ * Aggregation function to compute the average of distinct values for an MV 
column.
  */
-public class DistinctAvgAggregationFunction extends 
BaseDistinctAggregateAggregationFunction<Double> {
+public class DistinctAvgMVAggregationFunction extends 
BaseDistinctAggregateAggregationFunction<Double> {
+
+  public DistinctAvgMVAggregationFunction(ExpressionContext expression) {
+    super(expression, AggregationFunctionType.DISTINCTAVGMV);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    mvAggregate(length, aggregationResultHolder, blockValSetMap);
+  }
 
-  public DistinctAvgAggregationFunction(ExpressionContext expression) {
-    super(expression, AggregationFunctionType.DISTINCTAVG);
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 899a2d6ab5..aec983b8df 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
+
 /**
- * Aggregation function to compute the average of distinct values.
+ * Aggregation function to compute the average of distinct values for an SV 
column
  */
 public class DistinctCountAggregationFunction extends 
BaseDistinctAggregateAggregationFunction<Integer> {
 
@@ -32,6 +37,24 @@ public class DistinctCountAggregationFunction extends 
BaseDistinctAggregateAggre
     super(expression, AggregationFunctionType.DISTINCTCOUNT);
   }
 
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregate(length, aggregationResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
+  }
+
   @Override
   public ColumnDataType getFinalResultColumnType() {
     return ColumnDataType.INT;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
index 6dc65461a9..d8d257b400 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
@@ -18,261 +18,51 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
-import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
-import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
-import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregationFunction {
+/**
+ * Aggregation function to compute the average of distinct values for an MV 
column
+ */
+public class DistinctCountMVAggregationFunction extends 
BaseDistinctAggregateAggregationFunction<Integer> {
 
   public DistinctCountMVAggregationFunction(ExpressionContext expression) {
-    super(expression);
+    super(expression, AggregationFunctionType.DISTINCTCOUNTMV);
   }
 
-  @Override
-  public AggregationFunctionType getType() {
-    return AggregationFunctionType.DISTINCTCOUNTMV;
-  }
 
   @Override
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    BlockValSet blockValSet = blockValSetMap.get(_expression);
-
-    // For dictionary-encoded expression, store dictionary ids into the bitmap
-    Dictionary dictionary = blockValSet.getDictionary();
-    if (dictionary != null) {
-      RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, 
dictionary);
-      int[][] dictIds = blockValSet.getDictionaryIdsMV();
-      for (int i = 0; i < length; i++) {
-        dictIdBitmap.add(dictIds[i]);
-      }
-      return;
-    }
-
-    // For non-dictionary-encoded expression, store values into the value set
-    DataType storedType = blockValSet.getValueType().getStoredType();
-    Set valueSet = getValueSet(aggregationResultHolder, storedType);
-    switch (storedType) {
-      case INT:
-        IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
-        int[][] intValues = blockValSet.getIntValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (int value : intValues[i]) {
-            intSet.add(value);
-          }
-        }
-        break;
-      case LONG:
-        LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
-        long[][] longValues = blockValSet.getLongValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (long value : longValues[i]) {
-            longSet.add(value);
-          }
-        }
-        break;
-      case FLOAT:
-        FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
-        float[][] floatValues = blockValSet.getFloatValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (float value : floatValues[i]) {
-            floatSet.add(value);
-          }
-        }
-        break;
-      case DOUBLE:
-        DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
-        double[][] doubleValues = blockValSet.getDoubleValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (double value : doubleValues[i]) {
-            doubleSet.add(value);
-          }
-        }
-        break;
-      case STRING:
-        ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>) 
valueSet;
-        String[][] stringValues = blockValSet.getStringValuesMV();
-        for (int i = 0; i < length; i++) {
-          //noinspection ManualArrayToCollectionCopy
-          for (String value : stringValues[i]) {
-            //noinspection UseBulkOperation
-            stringSet.add(value);
-          }
-        }
-        break;
-      default:
-        throw new IllegalStateException("Illegal data type for 
DISTINCT_COUNT_MV aggregation function: " + storedType);
-    }
+    mvAggregate(length, aggregationResultHolder, blockValSetMap);
   }
 
   @Override
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    BlockValSet blockValSet = blockValSetMap.get(_expression);
-
-    // For dictionary-encoded expression, store dictionary ids into the bitmap
-    Dictionary dictionary = blockValSet.getDictionary();
-    if (dictionary != null) {
-      int[][] dictIds = blockValSet.getDictionaryIdsMV();
-      for (int i = 0; i < length; i++) {
-        getDictIdBitmap(groupByResultHolder, groupKeyArray[i], 
dictionary).add(dictIds[i]);
-      }
-      return;
-    }
-
-    // For non-dictionary-encoded expression, store values into the value set
-    DataType storedType = blockValSet.getValueType().getStoredType();
-    switch (storedType) {
-      case INT:
-        int[][] intValues = blockValSet.getIntValuesMV();
-        for (int i = 0; i < length; i++) {
-          IntOpenHashSet intSet = (IntOpenHashSet) 
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT);
-          for (int value : intValues[i]) {
-            intSet.add(value);
-          }
-        }
-        break;
-      case LONG:
-        long[][] longValues = blockValSet.getLongValuesMV();
-        for (int i = 0; i < length; i++) {
-          LongOpenHashSet longSet = (LongOpenHashSet) 
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.LONG);
-          for (long value : longValues[i]) {
-            longSet.add(value);
-          }
-        }
-        break;
-      case FLOAT:
-        float[][] floatValues = blockValSet.getFloatValuesMV();
-        for (int i = 0; i < length; i++) {
-          FloatOpenHashSet floatSet =
-              (FloatOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.FLOAT);
-          for (float value : floatValues[i]) {
-            floatSet.add(value);
-          }
-        }
-        break;
-      case DOUBLE:
-        double[][] doubleValues = blockValSet.getDoubleValuesMV();
-        for (int i = 0; i < length; i++) {
-          DoubleOpenHashSet doubleSet =
-              (DoubleOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.DOUBLE);
-          for (double value : doubleValues[i]) {
-            doubleSet.add(value);
-          }
-        }
-        break;
-      case STRING:
-        String[][] stringValues = blockValSet.getStringValuesMV();
-        for (int i = 0; i < length; i++) {
-          ObjectOpenHashSet<String> stringSet =
-              (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.STRING);
-          //noinspection ManualArrayToCollectionCopy
-          for (String value : stringValues[i]) {
-            //noinspection UseBulkOperation
-            stringSet.add(value);
-          }
-        }
-        break;
-      default:
-        throw new IllegalStateException("Illegal data type for 
DISTINCT_COUNT_MV aggregation function: " + storedType);
-    }
+    mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
   }
 
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
+  }
 
-    // For dictionary-encoded expression, store dictionary ids into the bitmap
-    Dictionary dictionary = blockValSet.getDictionary();
-    if (dictionary != null) {
-      int[][] dictIds = blockValSet.getDictionaryIdsMV();
-      for (int i = 0; i < length; i++) {
-        for (int groupKey : groupKeysArray[i]) {
-          getDictIdBitmap(groupByResultHolder, groupKey, 
dictionary).add(dictIds[i]);
-        }
-      }
-      return;
-    }
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.INT;
+  }
 
-    // For non-dictionary-encoded expression, store hash code of the values 
into the value set
-    DataType storedType = blockValSet.getValueType().getStoredType();
-    switch (storedType) {
-      case INT:
-        int[][] intValues = blockValSet.getIntValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet intSet = (IntOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.INT);
-            for (int value : intValues[i]) {
-              intSet.add(value);
-            }
-          }
-        }
-        break;
-      case LONG:
-        long[][] longValues = blockValSet.getLongValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (int groupKey : groupKeysArray[i]) {
-            LongOpenHashSet longSet = (LongOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.LONG);
-            for (long value : longValues[i]) {
-              longSet.add(value);
-            }
-          }
-        }
-        break;
-      case FLOAT:
-        float[][] floatValues = blockValSet.getFloatValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (int groupKey : groupKeysArray[i]) {
-            FloatOpenHashSet floatSet = (FloatOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.FLOAT);
-            for (float value : floatValues[i]) {
-              floatSet.add(value);
-            }
-          }
-        }
-        break;
-      case DOUBLE:
-        double[][] doubleValues = blockValSet.getDoubleValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (int groupKey : groupKeysArray[i]) {
-            DoubleOpenHashSet doubleSet =
-                (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.DOUBLE);
-            for (double value : doubleValues[i]) {
-              doubleSet.add(value);
-            }
-          }
-        }
-        break;
-      case STRING:
-        String[][] stringValues = blockValSet.getStringValuesMV();
-        for (int i = 0; i < length; i++) {
-          for (int groupKey : groupKeysArray[i]) {
-            ObjectOpenHashSet<String> stringSet =
-                (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKey, DataType.STRING);
-            //noinspection ManualArrayToCollectionCopy
-            for (String value : stringValues[i]) {
-              //noinspection UseBulkOperation
-              stringSet.add(value);
-            }
-          }
-        }
-        break;
-      default:
-        throw new IllegalStateException("Illegal data type for 
DISTINCT_COUNT_MV aggregation function: " + storedType);
-    }
+  @Override
+  public Integer extractFinalResult(Set intermediateResult) {
+    return intermediateResult.size();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
index b42bc13b08..f3a6c55805 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
+
 /**
- * Aggregation function to compute the sum of distinct values.
+ * Aggregation function to compute the sum of distinct values for an SV column.
  */
 public class DistinctSumAggregationFunction extends 
BaseDistinctAggregateAggregationFunction<Double> {
 
@@ -32,6 +37,24 @@ public class DistinctSumAggregationFunction extends 
BaseDistinctAggregateAggrega
     super(expression, AggregationFunctionType.DISTINCTSUM);
   }
 
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregate(length, aggregationResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
+  }
+
   @Override
   public DataSchema.ColumnDataType getFinalResultColumnType() {
     return DataSchema.ColumnDataType.DOUBLE;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java
similarity index 52%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java
index b42bc13b08..3ffd0acd16 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java
@@ -18,18 +18,41 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
+
 /**
- * Aggregation function to compute the sum of distinct values.
+ * Aggregation function to compute the sum of distinct values for an MV column.
  */
-public class DistinctSumAggregationFunction extends 
BaseDistinctAggregateAggregationFunction<Double> {
+public class DistinctSumMVAggregationFunction extends 
BaseDistinctAggregateAggregationFunction<Double> {
+
+  public DistinctSumMVAggregationFunction(ExpressionContext expression) {
+    super(expression, AggregationFunctionType.DISTINCTSUMMV);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    mvAggregate(length, aggregationResultHolder, blockValSetMap);
+  }
 
-  public DistinctSumAggregationFunction(ExpressionContext expression) {
-    super(expression, AggregationFunctionType.DISTINCTSUM);
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index b04cefb875..d74a81fa87 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -108,6 +108,8 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
   private final static String COL1_SORTED_INDEX = "sortedIndexCol1";
   private final static String COL1_JSON_INDEX = "jsonIndexCol1";
   private final static String COL1_TEXT_INDEX = "textIndexCol1";
+  private final static String MV_COL1_RAW = "mvRawCol1";
+  private final static String MV_COL1_NO_INDEX = "mvNoIndexCol1";
 
   private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
       .addSingleValueDimension(COL1_RAW, FieldSpec.DataType.INT)
@@ -123,14 +125,16 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
       .addSingleValueDimension(COL3_RANGE_INDEX, FieldSpec.DataType.INT)
       .addSingleValueDimension(COL1_SORTED_INDEX, FieldSpec.DataType.DOUBLE)
       .addSingleValueDimension(COL1_JSON_INDEX, FieldSpec.DataType.JSON)
-      .addSingleValueDimension(COL1_TEXT_INDEX, 
FieldSpec.DataType.STRING).build();
+      .addSingleValueDimension(COL1_TEXT_INDEX, FieldSpec.DataType.STRING)
+      .addMultiValueDimension(MV_COL1_RAW, FieldSpec.DataType.INT)
+      .addMultiValueDimension(MV_COL1_NO_INDEX, 
FieldSpec.DataType.INT).build();
 
   private static final DataSchema DATA_SCHEMA = new DataSchema(new 
String[]{"Operator", "Operator_Id", "Parent_Id"},
       new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
           DataSchema.ColumnDataType.INT});
 
   private static final TableConfig TABLE_CONFIG =
-      new 
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList(COL1_RAW))
+      new 
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList(COL1_RAW,
 MV_COL1_RAW))
           .setTableName(RAW_TABLE_NAME).build();
 
   private IndexSegment _indexSegment;
@@ -159,7 +163,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
   GenericRow createMockRecord(int noIndexCol1, int noIndexCol2, int 
noIndexCol3,
       boolean noIndexCol4, double invertedIndexCol1, int invertedIndexCol2, 
String intervedIndexCol3,
       double rangeIndexCol1, int rangeIndexCol2, int rangeIndexCol3, double 
sortedIndexCol1, String jsonIndexCol1,
-      String textIndexCol1, int rawCol1) {
+      String textIndexCol1, int rawCol1, Object[] mvRawCol1, Object[] 
mvNoIndexCol1) {
 
     GenericRow record = new GenericRow();
     record.putValue(COL1_RAW, rawCol1);
@@ -182,6 +186,9 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
     record.putValue(COL1_JSON_INDEX, jsonIndexCol1);
     record.putValue(COL1_TEXT_INDEX, textIndexCol1);
 
+    record.putValue(MV_COL1_RAW, mvRawCol1);
+    record.putValue(MV_COL1_NO_INDEX, mvNoIndexCol1);
+
     return record;
   }
 
@@ -232,38 +239,49 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
 
     List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
     records.add(createMockRecord(1, 2, 3, true, 1.1, 2, "daffy", 10.1, 20, 30, 
100.1,
-        "{\"first\": \"daffy\", \"last\": " + "\"duck\"}", "daffy", 1));
+        "{\"first\": \"daffy\", \"last\": " + "\"duck\"}", "daffy", 1, new 
Object[]{1, 2, 3}, new Object[]{1, 2, 3}));
     records.add(createMockRecord(0, 1, 2, false, 0.1, 1, "mickey", 0.1, 10, 
20, 100.2,
-        "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 0));
+        "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 0, new 
Object[]{2, 3, 4},
+        new Object[]{2, 3, 4}));
     records.add(createMockRecord(3, 4, 5, true, 2.1, 3, "mickey", 20.1, 30, 
40, 100.3,
-        "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 3));
+        "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 3, new 
Object[]{3, 4, 5},
+        new Object[]{3, 4, 5}));
     ImmutableSegment immutableSegment1 = createImmutableSegment(records, 
SEGMENT_NAME_1);
 
     List<GenericRow> records2 = new ArrayList<>(NUM_RECORDS);
     records2.add(createMockRecord(5, 2, 3, true, 1.1, 2, "pluto", 10.1, 20, 
30, 100.1,
-        "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 5));
+        "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 5, new 
Object[]{100, 200, 300},
+        new Object[]{100, 200, 300}));
     records2.add(createMockRecord(6, 1, 2, false, 0.1, 1, "pluto", 0.1, 10, 
20, 100.2,
-        "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 6));
+        "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 6, new 
Object[]{200, 300, 400},
+        new Object[]{200, 300, 400}));
     records2.add(createMockRecord(8, 4, 5, true, 2.1, 3, "pluto", 20.1, 30, 
40, 100.3,
-        "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 8));
+        "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 8, new 
Object[]{300, 400, 500},
+        new Object[]{300, 400, 500}));
     ImmutableSegment immutableSegment2 = createImmutableSegment(records2, 
SEGMENT_NAME_2);
 
     List<GenericRow> records3 = new ArrayList<>(NUM_RECORDS);
     records3.add(createMockRecord(5, 2, 3, true, 1.5, 2, "donald", 10.1, 20, 
30, 100.1,
-        "{\"first\": \"donald\", \"last\": " + "\"duck\"}", "donald", 1));
+        "{\"first\": \"donald\", \"last\": " + "\"duck\"}", "donald", 1, new 
Object[]{100, 200, 300},
+        new Object[]{100, 200, 300}));
     records3.add(createMockRecord(6, 1, 2, false, 0.1, 1, "goofy", 0.1, 10, 
20, 100.2,
-        "{\"first\": \"goofy\", \"last\": " + "\"dog\"}", "goofy", 1));
+        "{\"first\": \"goofy\", \"last\": " + "\"dog\"}", "goofy", 1, new 
Object[]{100, 200, 300},
+        new Object[]{100, 200, 300}));
     records3.add(createMockRecord(7, 4, 5, true, 2.1, 3, "minnie", 20.1, 30, 
40, 100.3,
-        "{\"first\": \"minnie\", \"last\": " + "\"mouse\"}", "minnie", 1));
+        "{\"first\": \"minnie\", \"last\": " + "\"mouse\"}", "minnie", 1, new 
Object[]{1000, 2000, 3000},
+        new Object[]{1000, 2000, 3000}));
     ImmutableSegment immutableSegment3 = createImmutableSegment(records3, 
SEGMENT_NAME_3);
 
     List<GenericRow> records4 = new ArrayList<>(NUM_RECORDS);
     records4.add(createMockRecord(5, 2, 3, true, 1.1, 2, "tweety", 10.1, 20, 
30, 100.1,
-        "{\"first\": \"tweety\", \"last\": " + "\"bird\"}", "tweety", 5));
+        "{\"first\": \"tweety\", \"last\": " + "\"bird\"}", "tweety", 5, new 
Object[]{100, 200, 300},
+        new Object[]{100, 200, 300}));
     records4.add(createMockRecord(6, 1, 2, false, 0.1, 1, "bugs", 0.1, 10, 20, 
100.2,
-        "{\"first\": \"bugs\", \"last\": " + "\"bunny\"}", "bugs", 6));
+        "{\"first\": \"bugs\", \"last\": " + "\"bunny\"}", "bugs", 6, new 
Object[]{100, 200, 300},
+        new Object[]{100, 200, 300}));
     records4.add(createMockRecord(7, 4, 5, true, 2.1, 3, "sylvester", 20.1, 
30, 40, 100.3,
-        "{\"first\": \"sylvester\", \"last\": " + "\"cat\"}", "sylvester", 7));
+        "{\"first\": \"sylvester\", \"last\": " + "\"cat\"}", "sylvester", 7, 
new Object[]{1000, 2000, 3000},
+        new Object[]{1000, 2000, 3000}));
     ImmutableSegment immutableSegment4 = createImmutableSegment(records4, 
SEGMENT_NAME_4);
 
     _indexSegment = immutableSegment1;
@@ -371,15 +389,16 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     result1.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", 
ExplainPlanRows.PLAN_START_IDS,
         ExplainPlanRows.PLAN_START_IDS});
     result1.add(new Object[]{
-        "SELECT(selectList:invertedIndexCol1, invertedIndexCol2, 
invertedIndexCol3, jsonIndexCol1, "
-            + "noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, 
rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, "
-            + "rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2});
+        "SELECT(selectList:invertedIndexCol1, invertedIndexCol2, 
invertedIndexCol3, jsonIndexCol1, mvNoIndexCol1, "
+            + "mvRawCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, 
rangeIndexCol1, rangeIndexCol2, "
+            + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 3, 
2});
     result1.add(new Object[]{"TRANSFORM_PASSTHROUGH(invertedIndexCol1, 
invertedIndexCol2, invertedIndexCol3, "
-        + "jsonIndexCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, 
rangeIndexCol1, rangeIndexCol2, "
-        + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 4, 3});
-    result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, 
noIndexCol3, rangeIndexCol1, "
-        + "rangeIndexCol2, invertedIndexCol1, noIndexCol2, invertedIndexCol2, 
noIndexCol1, rangeIndexCol3, "
-        + "textIndexCol1, jsonIndexCol1, invertedIndexCol3)", 5, 4});
+        + "jsonIndexCol1, mvNoIndexCol1, mvRawCol1, noIndexCol1, noIndexCol2, 
noIndexCol3, noIndexCol4, "
+        + "rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, rawCol1, 
sortedIndexCol1, textIndexCol1)", 4, 3});
+    result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, 
noIndexCol3, mvNoIndexCol1"
+        + ", rangeIndexCol1, rangeIndexCol2, invertedIndexCol1, noIndexCol2, 
invertedIndexCol2, noIndexCol1, "
+        + "rangeIndexCol3, textIndexCol1, mvRawCol1, jsonIndexCol1, 
invertedIndexCol3)", 5, 4
+    });
     result1.add(new Object[]{"DOC_ID_SET", 6, 5});
     result1.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
     check(query1, new ResultTable(DATA_SCHEMA, result1));
@@ -435,15 +454,19 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     result1.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", 
ExplainPlanRows.PLAN_START_IDS,
         ExplainPlanRows.PLAN_START_IDS});
     result1.add(new Object[]{
-        "SELECT(selectList:invertedIndexCol1, invertedIndexCol2, 
invertedIndexCol3, jsonIndexCol1, "
-            + "noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, 
rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, "
-            + "rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2});
+        "SELECT(selectList:invertedIndexCol1, invertedIndexCol2, 
invertedIndexCol3, jsonIndexCol1, mvNoIndexCol1, "
+            + "mvRawCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, 
rangeIndexCol1, rangeIndexCol2, "
+            + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2
+    });
     result1.add(new Object[]{"TRANSFORM_PASSTHROUGH(invertedIndexCol1, 
invertedIndexCol2, invertedIndexCol3, "
-        + "jsonIndexCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, 
rangeIndexCol1, rangeIndexCol2, "
-        + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 4, 3});
-    result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, 
noIndexCol3, rangeIndexCol1, "
-        + "rangeIndexCol2, invertedIndexCol1, noIndexCol2, invertedIndexCol2, 
noIndexCol1, rangeIndexCol3, "
-        + "textIndexCol1, jsonIndexCol1, invertedIndexCol3)", 5, 4});
+        + "jsonIndexCol1, mvNoIndexCol1, mvRawCol1, noIndexCol1, noIndexCol2, 
noIndexCol3, noIndexCol4, "
+        + "rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, rawCol1, 
sortedIndexCol1, textIndexCol1)", 4, 3
+    });
+    result1.add(new Object[]{
+        "PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, noIndexCol3, 
mvNoIndexCol1, "
+            + "rangeIndexCol1, rangeIndexCol2, invertedIndexCol1, noIndexCol2, 
invertedIndexCol2, noIndexCol1, "
+            + "rangeIndexCol3, textIndexCol1, mvRawCol1, jsonIndexCol1, 
invertedIndexCol3)", 5, 4
+    });
     result1.add(new Object[]{"DOC_ID_SET", 6, 5});
     result1.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
     check(query1, new ResultTable(DATA_SCHEMA, result1));
@@ -1703,19 +1726,42 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     result5.add(new Object[]{"AGGREGATE_NO_SCAN", 3, 2});
     check(query5, new ResultTable(DATA_SCHEMA, result5));
 
-    // Full scan required for distinctavg as the column does not have a 
dictionary.
-    String query6 = "EXPLAIN PLAN FOR SELECT DISTINCTAVG(rawCol1) FROM 
testTable";
+    String query6 = "EXPLAIN PLAN FOR SELECT DISTINCTSUMMV(mvNoIndexCol1) FROM 
testTable";
     List<Object[]> result6 = new ArrayList<>();
     result6.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0});
     result6.add(new Object[]{"COMBINE_AGGREGATE", 2, 1});
-    result6.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", 
ExplainPlanRows.PLAN_START_IDS,
-        ExplainPlanRows.PLAN_START_IDS});
-    result6.add(new Object[]{"AGGREGATE(aggregations:distinctAvg(rawCol1))", 
3, 2});
-    result6.add(new Object[]{"TRANSFORM_PASSTHROUGH(rawCol1)", 4, 3});
-    result6.add(new Object[]{"PROJECT(rawCol1)", 5, 4});
-    result6.add(new Object[]{"DOC_ID_SET", 6, 5});
-    result6.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
+    result6.add(new Object[]{
+        "PLAN_START(numSegmentsForThisPlan:4)", 
ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS
+    });
+    result6.add(new Object[]{"AGGREGATE_NO_SCAN", 3, 2});
     check(query6, new ResultTable(DATA_SCHEMA, result6));
+
+    // Full scan required for distinctavg as the column does not have a 
dictionary.
+    String query7 = "EXPLAIN PLAN FOR SELECT DISTINCTAVG(rawCol1) FROM 
testTable";
+    List<Object[]> result7 = new ArrayList<>();
+    result7.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0});
+    result7.add(new Object[]{"COMBINE_AGGREGATE", 2, 1});
+    result7.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", 
ExplainPlanRows.PLAN_START_IDS,
+        ExplainPlanRows.PLAN_START_IDS});
+    result7.add(new Object[]{"AGGREGATE(aggregations:distinctAvg(rawCol1))", 
3, 2});
+    result7.add(new Object[]{"TRANSFORM_PASSTHROUGH(rawCol1)", 4, 3});
+    result7.add(new Object[]{"PROJECT(rawCol1)", 5, 4});
+    result7.add(new Object[]{"DOC_ID_SET", 6, 5});
+    result7.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
+    check(query7, new ResultTable(DATA_SCHEMA, result7));
+
+    String query8 = "EXPLAIN PLAN FOR SELECT DISTINCTAVGMV(mvRawCol1) FROM 
testTable";
+    List<Object[]> result8 = new ArrayList<>();
+    result8.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0});
+    result8.add(new Object[]{"COMBINE_AGGREGATE", 2, 1});
+    result8.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", 
ExplainPlanRows.PLAN_START_IDS,
+        ExplainPlanRows.PLAN_START_IDS});
+    result8.add(new 
Object[]{"AGGREGATE(aggregations:distinctAvgMV(mvRawCol1))", 3, 2});
+    result8.add(new Object[]{"TRANSFORM_PASSTHROUGH(mvRawCol1)", 4, 3});
+    result8.add(new Object[]{"PROJECT(mvRawCol1)", 5, 4});
+    result8.add(new Object[]{"DOC_ID_SET", 6, 5});
+    result8.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
+    check(query8, new ResultTable(DATA_SCHEMA, result8));
   }
 
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 760c1c78c1..4d32358814 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -121,7 +121,8 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
   public void testMaxMV() {
     String query = "SELECT MAXMV(column6) AS value FROM testTable";
 
-    // Without filter, query should be answered by 
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.DOUBLE});
     ResultTable expectedResultTable =
@@ -148,7 +149,8 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
   public void testMinMV() {
     String query = "SELECT MINMV(column6) AS value FROM testTable";
 
-    // Without filter, query should be answered by 
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.DOUBLE});
     Object[] expectedResults = new Object[]{1001.0};
@@ -245,7 +247,8 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
   public void testMinMaxRangeMV() {
     String query = "SELECT MINMAXRANGEMV(column6) AS value FROM testTable";
 
-    // Without filter, query should be answered by 
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.DOUBLE});
     Object[] expectedResults = new Object[]{2147482646.0};
@@ -277,7 +280,8 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
   public void testDistinctCountMV() {
     String query = "SELECT DISTINCTCOUNTMV(column6) AS value FROM testTable";
 
-    // Without filter, query should be answered by 
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.INT});
     Object[] expectedResults = new Object[]{18499};
@@ -305,11 +309,78 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable);
   }
 
+  @Test
+  public void testDistinctSumMV() {
+    String query = "SELECT DISTINCTSUMMV(column6) AS value FROM testTable";
+
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.DOUBLE});
+    Object[] expectedResults = new Object[]{24592775810.0};
+    ResultTable expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(expectedResults));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L, 
400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResults[0] = 2578123532.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
62480L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+    expectedResults[0] = 6304833321.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+    expectedResults[0] = 2578123532.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+    expectedResults[0] = 8999975927.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+    expectedResults[0] = 2478539095.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable);
+  }
+
+  @Test
+  public void testDistinctAvgMV() {
+    String query = "SELECT DISTINCTAVGMV(column6) AS value FROM testTable";
+
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.DOUBLE});
+    Object[] expectedResults = new Object[]{1329411.0930320558};
+    ResultTable expectedResultTable = new ResultTable(expectedDataSchema, 
Collections.singletonList(expectedResults));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L, 
400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResults[0] = 2173797.244519393;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
62480L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+    expectedResults[0] = 2147483647.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+    expectedResults[0] = 2147483647.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+    expectedResults[0] = 2147483647.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 
800000L, 400000L, expectedResultTable);
+
+    brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+    expectedResults[0] = 2147483647.0;
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 
124960L, 400000L, expectedResultTable);
+  }
+
   @Test
   public void testDistinctCountHLLMV() {
     String query = "SELECT DISTINCTCOUNTHLLMV(column6) AS value FROM 
testTable";
 
-    // Without filter, query should be answered by 
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.LONG});
     Object[] expectedResults = new Object[]{20039L};
@@ -343,7 +414,8 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     Function<Object, Object> cardinalityExtractor =
         value -> 
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(BytesUtils.toBytes((String) 
value)).cardinality();
 
-    // Without filter, query should be answered by 
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // Without filter, query should be answered by 
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+    // for dictionary based columns.
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new 
ColumnDataType[]{ColumnDataType.LONG});
     Object[] expectedResults = new Object[]{20039L};
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index fc2aec9bc9..8f53122bd4 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -82,6 +82,8 @@ public enum AggregationFunctionType {
   DISTINCTCOUNTBITMAPMV("distinctCountBitmapMV"),
   DISTINCTCOUNTHLLMV("distinctCountHLLMV"),
   DISTINCTCOUNTRAWHLLMV("distinctCountRawHLLMV"),
+  DISTINCTSUMMV("distinctSumMV"),
+  DISTINCTAVGMV("distinctAvgMV"),
   PERCENTILEMV("percentileMV"),
   PERCENTILEESTMV("percentileEstMV"),
   PERCENTILERAWESTMV("percentileRawEstMV"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to