This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 272cddd8dd2 Star-tree index aggregate on MV columns (#16836)
272cddd8dd2 is described below

commit 272cddd8dd2ff69a98907676691336787c4874e1
Author: Mohemmad Zaid Khan <[email protected]>
AuthorDate: Fri Oct 24 21:02:13 2025 +0530

    Star-tree index aggregate on MV columns (#16836)
---
 .../function/AvgAggregationFunction.java           | 32 ++++-----
 .../function/AvgMVAggregationFunction.java         | 32 ++++++++-
 .../function/CountMVAggregationFunction.java       | 25 +++++++
 .../function/SumMVAggregationFunction.java         | 33 ++++++++++
 .../core/startree/v2/AvgMVStarTreeV2Test.java      | 57 ++++++++++++++++
 .../pinot/core/startree/v2/BaseStarTreeV2Test.java | 31 ++++++---
 .../core/startree/v2/CountMVStarTreeV2Test.java    | 55 ++++++++++++++++
 .../core/startree/v2/SumMVStarTreeV2Test.java      | 55 ++++++++++++++++
 .../src/test/resources/TableIndexingTest.csv       | 16 ++---
 .../tests/StarTreeClusterIntegrationTest.java      | 46 ++++++++++++-
 ...rmance_2014_100k_subset_nonulls_columns.schema} |  3 +-
 .../local/aggregator/AvgMVValueAggregator.java     | 77 ++++++++++++++++++++++
 .../local/aggregator/CountMVValueAggregator.java   | 69 +++++++++++++++++++
 .../local/aggregator/SumMVValueAggregator.java     | 69 +++++++++++++++++++
 .../local/aggregator/ValueAggregatorFactory.java   | 12 ++++
 .../segment/local/utils/TableConfigUtils.java      | 21 +++---
 .../local/aggregator/ValueAggregatorTest.java      |  3 +
 .../airlineStats_offline_table_config.json         | 11 ++++
 .../batch/airlineStats/airlineStats_schema.json    |  2 +-
 19 files changed, 599 insertions(+), 50 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
index 0e92c70b3bd..39dc8146709 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
@@ -81,12 +81,10 @@ public class AvgAggregationFunction extends 
NullableSingleInputAggregationFuncti
       // Serialized AvgPair
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
       AvgPair avgPair = new AvgPair();
-      forEachNotNull(length, blockValSet, (from, to) -> {
-        for (int i = from; i < to; i++) {
-          AvgPair value = 
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          avgPair.apply(value);
-        }
-      });
+      for (int i = 0; i < length; i++) {
+        AvgPair value = 
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+        avgPair.apply(value);
+      }
       // Only set the aggregation result when there is at least one non-null 
input value
       if (avgPair.getCount() != 0) {
         updateAggregationResult(aggregationResultHolder, avgPair.getSum(), 
avgPair.getCount());
@@ -118,12 +116,10 @@ public class AvgAggregationFunction extends 
NullableSingleInputAggregationFuncti
     } else {
       // Serialized AvgPair
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
-      forEachNotNull(length, blockValSet, (from, to) -> {
-        for (int i = from; i < to; i++) {
-          AvgPair avgPair = 
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
avgPair.getSum(), avgPair.getCount());
-        }
-      });
+      for (int i = 0; i < length; i++) {
+        AvgPair avgPair = 
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+        updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
avgPair.getSum(), avgPair.getCount());
+      }
     }
   }
 
@@ -144,14 +140,12 @@ public class AvgAggregationFunction extends 
NullableSingleInputAggregationFuncti
     } else {
       // Serialized AvgPair
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
-      forEachNotNull(length, blockValSet, (from, to) -> {
-        for (int i = from; i < to; i++) {
-          AvgPair avgPair = 
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          for (int groupKey : groupKeysArray[i]) {
-            updateGroupByResult(groupKey, groupByResultHolder, 
avgPair.getSum(), avgPair.getCount());
-          }
+      for (int i = 0; i < length; i++) {
+        AvgPair avgPair = 
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+        for (int groupKey : groupKeysArray[i]) {
+          updateGroupByResult(groupKey, groupByResultHolder, avgPair.getSum(), 
avgPair.getCount());
         }
-      });
+      }
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
index a81af44622e..d9aadc1b9c2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.AvgPair;
@@ -43,8 +44,23 @@ public class AvgMVAggregationFunction extends 
AvgAggregationFunction {
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
-    double[][] valuesArray = blockValSet.getDoubleValuesMV();
 
+    if (blockValSet.isSingleValue()) {
+      // star-tree pre-aggregated values: During star-tree creation, the 
multi-value column is pre-aggregated
+      // per star-tree node, resulting in a single value per node.
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      AvgPair avgPair = new AvgPair();
+      for (int i = 0; i < length; i++) {
+        AvgPair value = 
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+        avgPair.apply(value);
+      }
+      if (avgPair.getCount() != 0) {
+        updateAggregationResult(aggregationResultHolder, avgPair.getSum(), 
avgPair.getCount());
+      }
+      return;
+    }
+
+    double[][] valuesArray = blockValSet.getDoubleValuesMV();
     AvgPair avgPair = new AvgPair();
     forEachNotNull(length, blockValSet, (from, to) -> {
       for (int i = from; i < to; i++) {
@@ -53,7 +69,6 @@ public class AvgMVAggregationFunction extends 
AvgAggregationFunction {
         }
       }
     });
-
     if (avgPair.getCount() != 0) {
       updateAggregationResult(aggregationResultHolder, avgPair.getSum(), 
avgPair.getCount());
     }
@@ -63,8 +78,19 @@ public class AvgMVAggregationFunction extends 
AvgAggregationFunction {
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
-    double[][] valuesArray = blockValSet.getDoubleValuesMV();
 
+    if (blockValSet.isSingleValue()) {
+      // star-tree pre-aggregated values: During star-tree creation, the 
multi-value column is pre-aggregated
+      // per star-tree node, resulting in a single value per node.
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        AvgPair avgPair = 
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+        updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
avgPair.getSum(), avgPair.getCount());
+      }
+      return;
+    }
+
+    double[][] valuesArray = blockValSet.getDoubleValuesMV();
     forEachNotNull(length, blockValSet, (from, to) -> {
       for (int i = from; i < to; i++) {
         aggregateOnGroupKey(groupKeyArray[i], groupByResultHolder, 
valuesArray[i]);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
index 153aa1c37fc..f6d213584ec 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
@@ -53,6 +53,19 @@ public class CountMVAggregationFunction extends 
CountAggregationFunction {
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.isSingleValue()) {
+      // star-tree pre-aggregated values: During star-tree creation, the 
multi-value column is pre-aggregated
+      // per star-tree node, resulting in a single value per node.
+      long[] valueArray = blockValSet.getLongValuesSV();
+      long count = 0;
+      for (int i = 0; i < length; i++) {
+        count += valueArray[i];
+      }
+      
aggregationResultHolder.setValue(aggregationResultHolder.getDoubleResult() + 
count);
+      return;
+    }
+
     int[] valueArray = blockValSet.getNumMVEntries();
     // Hack to make count effectively final for use in the lambda (we know 
that there aren't concurrent access issues
     // with forEachNotNull)
@@ -69,6 +82,18 @@ public class CountMVAggregationFunction extends 
CountAggregationFunction {
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.isSingleValue()) {
+      // star-tree pre-aggregated values: During star-tree creation, the 
multi-value column is pre-aggregated
+      // per star-tree node, resulting in a single value per node.
+      long[] valueArray = blockValSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        int groupKey = groupKeyArray[i];
+        groupByResultHolder.setValueForKey(groupKey, 
groupByResultHolder.getDoubleResult(groupKey) + valueArray[i]);
+      }
+      return;
+    }
+
     int[] valueArray = blockValSet.getNumMVEntries();
     forEachNotNull(length, blockValSet, (from, to) -> {
       for (int i = from; i < to; i++) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
index f7d914d2dbc..b0cae2534d3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
@@ -42,6 +42,19 @@ public class SumMVAggregationFunction extends 
SumAggregationFunction {
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.isSingleValue()) {
+      // star-tree pre-aggregated values: During star-tree creation, the 
multi-value column is pre-aggregated
+      // per star-tree node, resulting in a single value per node.
+      double[] valueArray = blockValSet.getDoubleValuesSV();
+      double sum = 0.0;
+      for (int i = 0; i < length; i++) {
+        sum += valueArray[i];
+      }
+      updateAggregationResultHolder(aggregationResultHolder, sum);
+      return;
+    }
+
     double[][] valuesArray = blockValSet.getDoubleValuesMV();
 
     Double sum;
@@ -62,6 +75,26 @@ public class SumMVAggregationFunction extends 
SumAggregationFunction {
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.isSingleValue()) {
+      // star-tree pre-aggregated values: During star-tree creation, the 
multi-value column is pre-aggregated
+      // per star-tree node, resulting in a single value per node.
+      double[] valueArray = blockValSet.getDoubleValuesSV();
+      if (_nullHandlingEnabled) {
+        for (int i = 0; i < length; i++) {
+          int groupKey = groupKeyArray[i];
+          Double result = groupByResultHolder.getResult(groupKey);
+          groupByResultHolder.setValueForKey(groupKey, result == null ? 
valueArray[i] : result + valueArray[i]);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          int groupKey = groupKeyArray[i];
+          groupByResultHolder.setValueForKey(groupKey, 
groupByResultHolder.getDoubleResult(groupKey) + valueArray[i]);
+        }
+      }
+      return;
+    }
+
     double[][] valuesArray = blockValSet.getDoubleValuesMV();
 
     if (_nullHandlingEnabled) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AvgMVStarTreeV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AvgMVStarTreeV2Test.java
new file mode 100644
index 00000000000..91b56716128
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AvgMVStarTreeV2Test.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.pinot.segment.local.aggregator.AvgMVValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class AvgMVStarTreeV2Test extends BaseStarTreeV2Test<Object, AvgPair> {
+
+  @Override
+  ValueAggregator<Object, AvgPair> getValueAggregator() {
+    return new AvgMVValueAggregator();
+  }
+
+  @Override
+  DataType getRawValueType() {
+    return DataType.INT;
+  }
+
+  @Override
+  Object getRandomRawValue(Random random) {
+    return random.nextInt();
+  }
+
+  @Override
+  protected void assertAggregatedValue(AvgPair starTreeResult, AvgPair 
nonStarTreeResult) {
+    assertEquals(starTreeResult.getSum(), nonStarTreeResult.getSum(), 1e-5);
+    assertEquals(starTreeResult.getCount(), nonStarTreeResult.getCount());
+  }
+
+  @Override
+  protected boolean isAggColSingleValueField() {
+    return false;
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index f94f4d58e05..306daf40bb2 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -98,7 +98,7 @@ abstract class BaseStarTreeV2Test<R, A> {
   private static final String DIMENSION1 = "d1__COLUMN_NAME";
   private static final String DIMENSION2 = "DISTINCTCOUNTRAWHLL__d2";
   private static final int DIMENSION_CARDINALITY = 100;
-  private static final String METRIC = "m";
+  private static final String AGG_COL = "m";
 
   // Supported filters
   private static final String QUERY_FILTER_AND = String.format(" WHERE %1$s = 
0 AND %2$s < 10", DIMENSION1, DIMENSION2);
@@ -151,7 +151,8 @@ abstract class BaseStarTreeV2Test<R, A> {
     DataType rawValueType = getRawValueType();
     // Raw value type will be null for COUNT aggregation function
     if (rawValueType != null) {
-      schemaBuilder.addMetric(METRIC, rawValueType);
+      schemaBuilder.addDimensionField(AGG_COL, rawValueType,
+              dimensionFieldSpec -> 
dimensionFieldSpec.setSingleValueField(isAggColSingleValueField()));
     }
     Schema schema = schemaBuilder.build();
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
@@ -162,7 +163,7 @@ abstract class BaseStarTreeV2Test<R, A> {
       segmentRecord.putValue(DIMENSION1, 
RANDOM.nextInt(DIMENSION_CARDINALITY));
       segmentRecord.putValue(DIMENSION2, 
RANDOM.nextInt(DIMENSION_CARDINALITY));
       if (rawValueType != null) {
-        segmentRecord.putValue(METRIC, getRandomRawValue(RANDOM));
+        segmentRecord.putValue(AGG_COL, getRandomRawValue(RANDOM));
       }
       segmentRecords.add(segmentRecord);
     }
@@ -175,8 +176,9 @@ abstract class BaseStarTreeV2Test<R, A> {
     driver.build();
 
     StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList(DIMENSION1, DIMENSION2), null, null,
-        Collections.singletonList(new StarTreeAggregationConfig(METRIC, 
_valueAggregator.getAggregationType().getName(),
-            null, getCompressionCodec(), true, getIndexVersion(), null, 
null)), MAX_LEAF_RECORDS);
+        Collections.singletonList(new StarTreeAggregationConfig(AGG_COL,
+                _valueAggregator.getAggregationType().getName(), null, 
getCompressionCodec(),
+                true, getIndexVersion(), null, null)), MAX_LEAF_RECORDS);
     File indexDir = new File(TEMP_DIR, SEGMENT_NAME);
     // Randomly build star-tree using on-heap or off-heap mode
     MultipleTreesBuilder.BuildMode buildMode =
@@ -196,9 +198,9 @@ abstract class BaseStarTreeV2Test<R, A> {
     } else if (aggregationType == AggregationFunctionType.PERCENTILEEST
         || aggregationType == AggregationFunctionType.PERCENTILETDIGEST) {
       // Append a percentile number for percentile functions
-      return String.format("%s(%s, 50)", aggregationType.getName(), METRIC);
+      return String.format("%s(%s, 50)", aggregationType.getName(), AGG_COL);
     } else {
-      return String.format("%s(%s)", aggregationType.getName(), METRIC);
+      return String.format("%s(%s)", aggregationType.getName(), AGG_COL);
     }
   }
 
@@ -476,7 +478,16 @@ abstract class BaseStarTreeV2Test<R, A> {
 
   private Object getNextRawValue(int docId, ForwardIndexReader reader, 
ForwardIndexReaderContext readerContext,
       Dictionary dictionary) {
-    return dictionary.get(reader.getDictId(docId, readerContext));
+    if (isAggColSingleValueField()) {
+      return dictionary.get(reader.getDictId(docId, readerContext));
+    } else {
+      int[] dictIds = reader.getDictIdMV(docId, readerContext);
+        Object[] rawValue = new Object[dictIds.length];
+        for (int i = 0; i < dictIds.length; i++) {
+          rawValue[i] = dictionary.get(dictIds[i]);
+        }
+        return rawValue;
+    }
   }
 
   /**
@@ -499,6 +510,10 @@ abstract class BaseStarTreeV2Test<R, A> {
     return version > 1 ? version : null;
   }
 
+  protected boolean isAggColSingleValueField() {
+    return true;
+  }
+
   abstract ValueAggregator<R, A> getValueAggregator();
 
   abstract DataType getRawValueType();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/CountMVStarTreeV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/CountMVStarTreeV2Test.java
new file mode 100644
index 00000000000..5c2e23a162d
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/CountMVStarTreeV2Test.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.pinot.segment.local.aggregator.CountMVValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class CountMVStarTreeV2Test extends BaseStarTreeV2Test<Object, Long> {
+
+  @Override
+  ValueAggregator<Object, Long> getValueAggregator() {
+    return new CountMVValueAggregator();
+  }
+
+  @Override
+  DataType getRawValueType() {
+    return DataType.INT;
+  }
+
+  @Override
+  Object getRandomRawValue(Random random) {
+    return random.nextInt();
+  }
+
+  @Override
+  protected void assertAggregatedValue(Long starTreeResult, Long 
nonStarTreeResult) {
+    assertEquals(starTreeResult, nonStarTreeResult);
+  }
+
+  @Override
+  protected boolean isAggColSingleValueField() {
+    return false;
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/SumMVStarTreeV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/SumMVStarTreeV2Test.java
new file mode 100644
index 00000000000..009a7a9863b
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/SumMVStarTreeV2Test.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.pinot.segment.local.aggregator.SumMVValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SumMVStarTreeV2Test extends BaseStarTreeV2Test<Object, Double> {
+
+  @Override
+  ValueAggregator<Object, Double> getValueAggregator() {
+    return new SumMVValueAggregator();
+  }
+
+  @Override
+  DataType getRawValueType() {
+    return DataType.INT;
+  }
+
+  @Override
+  Object getRandomRawValue(Random random) {
+    return random.nextInt();
+  }
+
+  @Override
+  protected void assertAggregatedValue(Double starTreeResult, Double 
nonStarTreeResult) {
+    assertEquals(starTreeResult, nonStarTreeResult, 1e-5);
+  }
+
+  @Override
+  protected boolean isAggColSingleValueField() {
+    return false;
+  }
+}
diff --git a/pinot-core/src/test/resources/TableIndexingTest.csv 
b/pinot-core/src/test/resources/TableIndexingTest.csv
index 420f2c72fe7..96042989c34 100644
--- a/pinot-core/src/test/resources/TableIndexingTest.csv
+++ b/pinot-core/src/test/resources/TableIndexingTest.csv
@@ -44,7 +44,7 @@ INT;mv;dict;json_index;false;Cannot create JSON index on 
multi-value column: col
 INT;mv;dict;native_text_index;false;Cannot create TEXT index on column: col of 
stored type other than STRING
 INT;mv;dict;text_index;false;Cannot create TEXT index on column: col of stored 
type other than STRING
 INT;mv;dict;range_index;true;
-INT;mv;dict;startree_index;false;Star-tree index can only be created on 
single-value columns, but found multi-value column: col
+INT;mv;dict;startree_index;false;Star-tree dimension columns must be 
single-value, but found multi-value column: col
 INT;mv;dict;vector_index;false;Cannot create vector index on column: col of 
stored type other than FLOAT
 INT;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column: col 
of stored type other than STRING
 LONG;sv;raw;timestamp_index;true;
@@ -92,7 +92,7 @@ LONG;mv;dict;json_index;false;Cannot create JSON index on 
multi-value column: co
 LONG;mv;dict;native_text_index;false;Cannot create TEXT index on column: col 
of stored type other than STRING
 LONG;mv;dict;text_index;false;Cannot create TEXT index on column: col of 
stored type other than STRING
 LONG;mv;dict;range_index;true;
-LONG;mv;dict;startree_index;false;Star-tree index can only be created on 
single-value columns, but found multi-value column: col
+LONG;mv;dict;startree_index;false;Star-tree dimension columns must be 
single-value, but found multi-value column: col
 LONG;mv;dict;vector_index;false;Cannot create vector index on column: col of 
stored type other than FLOAT
 LONG;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column: 
col of stored type other than STRING
 FLOAT;sv;raw;timestamp_index;false;Cannot create TIMESTAMP index on column: 
col of stored type other than LONG
@@ -140,7 +140,7 @@ FLOAT;mv;dict;json_index;false;Cannot create JSON index on 
multi-value column: c
 FLOAT;mv;dict;native_text_index;false;Cannot create TEXT index on column: col 
of stored type other than STRING
 FLOAT;mv;dict;text_index;false;Cannot create TEXT index on column: col of 
stored type other than STRING
 FLOAT;mv;dict;range_index;true;
-FLOAT;mv;dict;startree_index;false;Star-tree index can only be created on 
single-value columns, but found multi-value column: col
+FLOAT;mv;dict;startree_index;false;Star-tree dimension columns must be 
single-value, but found multi-value column: col
 FLOAT;mv;dict;vector_index;true;
 FLOAT;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column: 
col of stored type other than STRING
 DOUBLE;sv;raw;timestamp_index;false;Cannot create TIMESTAMP index on column: 
col of stored type other than LONG
@@ -188,7 +188,7 @@ DOUBLE;mv;dict;json_index;false;Cannot create JSON index on 
multi-value column:
 DOUBLE;mv;dict;native_text_index;false;Cannot create TEXT index on column: col 
of stored type other than STRING
 DOUBLE;mv;dict;text_index;false;Cannot create TEXT index on column: col of 
stored type other than STRING
 DOUBLE;mv;dict;range_index;true;
-DOUBLE;mv;dict;startree_index;false;Star-tree index can only be created on 
single-value columns, but found multi-value column: col
+DOUBLE;mv;dict;startree_index;false;Star-tree dimension columns must be 
single-value, but found multi-value column: col
 DOUBLE;mv;dict;vector_index;false;Cannot create vector index on column: col of 
stored type other than FLOAT
 DOUBLE;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column: 
col of stored type other than STRING
 DECIMAL;sv_BIG;raw;timestamp_index;false;Cannot create TIMESTAMP index on 
column: col of stored type other than LONG
@@ -260,7 +260,7 @@ BOOLEAN;mv;dict;json_index;false;Cannot create JSON index 
on multi-value column:
 BOOLEAN;mv;dict;native_text_index;false;Cannot create TEXT index on column: 
col of stored type other than STRING
 BOOLEAN;mv;dict;text_index;false;Cannot create TEXT index on column: col of 
stored type other than STRING
 BOOLEAN;mv;dict;range_index;true;
-BOOLEAN;mv;dict;startree_index;false;Star-tree index can only be created on 
single-value columns, but found multi-value column: col
+BOOLEAN;mv;dict;startree_index;false;Star-tree dimension columns must be 
single-value, but found multi-value column: col
 BOOLEAN;mv;dict;vector_index;false;Cannot create vector index on column: col 
of stored type other than FLOAT
 BOOLEAN;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column: 
col of stored type other than STRING
 TIMESTAMP;sv;raw;timestamp_index;true;
@@ -308,7 +308,7 @@ TIMESTAMP;mv;dict;json_index;false;Cannot create JSON index 
on multi-value colum
 TIMESTAMP;mv;dict;native_text_index;false;Cannot create TEXT index on column: 
col of stored type other than STRING
 TIMESTAMP;mv;dict;text_index;false;Cannot create TEXT index on column: col of 
stored type other than STRING
 TIMESTAMP;mv;dict;range_index;true;
-TIMESTAMP;mv;dict;startree_index;false;Star-tree index can only be created on 
single-value columns, but found multi-value column: col
+TIMESTAMP;mv;dict;startree_index;false;Star-tree dimension columns must be 
single-value, but found multi-value column: col
 TIMESTAMP;mv;dict;vector_index;false;Cannot create vector index on column: col 
of stored type other than FLOAT
 TIMESTAMP;mv;dict;multi_col_text_index;false;Cannot create TEXT index on 
column: col of stored type other than STRING
 STRING;sv;raw;timestamp_index;false;Cannot create TIMESTAMP index on column: 
col of stored type other than LONG
@@ -356,7 +356,7 @@ STRING;mv;dict;json_index;false;Cannot create JSON index on 
multi-value column:
 STRING;mv;dict;native_text_index;true;
 STRING;mv;dict;text_index;true;
 STRING;mv;dict;range_index;true;
-STRING;mv;dict;startree_index;false;Star-tree index can only be created on 
single-value columns, but found multi-value column: col
+STRING;mv;dict;startree_index;false;Star-tree dimension columns must be 
single-value, but found multi-value column: col
 STRING;mv;dict;vector_index;false;Cannot create vector index on column: col of 
stored type other than FLOAT
 STRING;mv;dict;multi_col_text_index;true;
 JSON;sv;raw;timestamp_index;false;Cannot create TIMESTAMP index on column: col 
of stored type other than LONG
@@ -428,7 +428,7 @@ BYTES;mv;dict;json_index;false;Cannot create JSON index on 
multi-value column: c
 BYTES;mv;dict;native_text_index;false;Cannot create TEXT index on column: col 
of stored type other than STRING
 BYTES;mv;dict;text_index;false;Cannot create TEXT index on column: col of 
stored type other than STRING
 BYTES;mv;dict;range_index;false;Caught exception while reading data
-BYTES;mv;dict;startree_index;false;Star-tree index can only be created on 
single-value columns, but found multi-value column: col
+BYTES;mv;dict;startree_index;false;Star-tree dimension columns must be 
single-value, but found multi-value column: col
 BYTES;mv;dict;vector_index;false;Cannot create vector index on column: col of 
stored type other than FLOAT
 BYTES;mv;dict;multi_col_text_index;false;Caught exception while reading data
 STRING;map;raw;timestamp_index;false;Cannot create TIMESTAMP index on column: 
col of stored type other than LONG
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
index 276ffe53b38..6480f933070 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
@@ -63,7 +63,7 @@ import static org.testng.Assert.assertTrue;
 public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest 
{
   public static final String FILTER_STARTREE_INDEX = "FILTER_STARTREE_INDEX";
   private static final String SCHEMA_FILE_NAME =
-      
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
+          
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema";
   private static final int NUM_STAR_TREE_DIMENSIONS = 5;
   private static final int NUM_STAR_TREE_METRICS = 5;
   private static final List<AggregationFunctionType> 
AGGREGATION_FUNCTION_TYPES =
@@ -112,7 +112,9 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
     int starTree1MaxLeafRecords = 10;
 
     // Randomly pick some dimensions and metrics for the second star-tree
+    // Exclude TotalAddGTime since it's a multi-value column, should not be in 
dimension split order.
     List<String> allDimensions = new ArrayList<>(schema.getDimensionNames());
+    allDimensions.remove("TotalAddGTime");
     Collections.shuffle(allDimensions, _random);
     List<String> starTree2Dimensions = allDimensions.subList(0, 
NUM_STAR_TREE_DIMENSIONS);
     List<String> allMetrics = new ArrayList<>(schema.getMetricNames());
@@ -120,10 +122,16 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
     List<String> starTree2Metrics = allMetrics.subList(0, 
NUM_STAR_TREE_METRICS);
     int starTree2MaxLeafRecords = 100;
 
+    // Tests StarTree aggregate for multi-value column
+    List<String> starTree3Dimensions =
+        Arrays.asList("OriginCityName", "DepTimeBlk", "LongestAddGTime", 
"CRSDepTime", "DivArrDelay");
+    int starTree3MaxLeafRecords = 10;
+
     TableConfig tableConfig = createOfflineTableConfig();
     tableConfig.getIndexingConfig().setStarTreeIndexConfigs(
         Arrays.asList(getStarTreeIndexConfig(starTree1Dimensions, 
starTree1Metrics, starTree1MaxLeafRecords),
-            getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics, 
starTree2MaxLeafRecords)));
+            getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics, 
starTree2MaxLeafRecords),
+            getStarTreeIndexConfigForMVColAgg(starTree3Dimensions, 
starTree3MaxLeafRecords)));
     addTableConfig(tableConfig);
 
     // Unpack the Avro files
@@ -166,6 +174,14 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
     return new StarTreeIndexConfig(dimensions, null, null, aggregationConfigs, 
maxLeafRecords);
   }
 
+  private static StarTreeIndexConfig 
getStarTreeIndexConfigForMVColAgg(List<String> dimensions, int maxLeafRecords) {
+    List<StarTreeAggregationConfig> aggregationConfigs = new ArrayList<>();
+    aggregationConfigs.add(new StarTreeAggregationConfig("TotalAddGTime", 
"COUNTMV"));
+    aggregationConfigs.add(new StarTreeAggregationConfig("TotalAddGTime", 
"SUMMV"));
+    aggregationConfigs.add(new StarTreeAggregationConfig("TotalAddGTime", 
"AVGMV"));
+    return new StarTreeIndexConfig(dimensions, null, null, aggregationConfigs, 
maxLeafRecords);
+  }
+
   @Test(dataProvider = "useBothQueryEngines")
   public void testGeneratedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
@@ -209,6 +225,32 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
     testStarQuery(starQuery, !useMultiStageQueryEngine);
   }
 
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testMultiValueColumnAggregations(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String starQuery = "SELECT COUNTMV(TotalAddGTime), SUMMV(TotalAddGTime), 
AVGMV(TotalAddGTime) FROM mytable";
+    testStarQuery(starQuery, !useMultiStageQueryEngine);
+
+    starQuery = "SELECT OriginCityName, COUNTMV(TotalAddGTime), 
AVGMV(TotalAddGTime), SUMMV(TotalAddGTime) "
+        + "FROM mytable GROUP BY OriginCityName ORDER BY OriginCityName";
+    testStarQuery(starQuery, !useMultiStageQueryEngine);
+
+    starQuery = "SELECT DepTimeBlk, SUMMV(TotalAddGTime), AVGMV(TotalAddGTime) 
FROM mytable "
+        + "WHERE CRSDepTime > 1000 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+    testStarQuery(starQuery, !useMultiStageQueryEngine);
+
+    starQuery = "SELECT OriginCityName, DepTimeBlk, SUMMV(TotalAddGTime) FROM 
mytable "
+        + "GROUP BY OriginCityName, DepTimeBlk ORDER BY OriginCityName, 
DepTimeBlk LIMIT 100";
+    testStarQuery(starQuery, !useMultiStageQueryEngine);
+
+    starQuery = "SELECT CRSDepTime, AVGMV(TotalAddGTime) FROM mytable "
+        + "WHERE CRSDepTime BETWEEN 800 AND 1200 AND DivArrDelay < 100 "
+        + "GROUP BY CRSDepTime ORDER BY CRSDepTime";
+    testStarQuery(starQuery, !useMultiStageQueryEngine);
+  }
+
   private void testStarQuery(String starQuery, boolean verifyPlan)
       throws Exception {
     String explain = "EXPLAIN PLAN FOR ";
diff --git 
a/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema
 
b/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema
similarity index 98%
rename from 
pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema
rename to 
pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema
index b0b2e7c450b..dfdfa576328 100644
--- 
a/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema
+++ 
b/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema
@@ -211,7 +211,8 @@
     },
     {
       "name": "TotalAddGTime",
-      "dataType": "INT"
+      "dataType": "INT",
+      "singleValueField": false
     }
   ],
   "metricFieldSpecs": [
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java
new file mode 100644
index 00000000000..1ad4dbfaddd
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+/**
+ * Value aggregator for AVGMV aggregation function.
+ * This aggregator handles multi-value columns by computing the average across 
all values in all arrays.
+ */
+public class AvgMVValueAggregator extends AvgValueAggregator {
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.AVGMV;
+  }
+
+  @Override
+  public AvgPair getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return new AvgPair();
+    }
+    if (rawValue instanceof byte[]) {
+      return deserializeAggregatedValue((byte[]) rawValue);
+    } else {
+      return processMultiValueArray(rawValue);
+    }
+  }
+
+  @Override
+  public AvgPair applyRawValue(AvgPair value, Object rawValue) {
+    if (rawValue instanceof byte[]) {
+      value.apply(deserializeAggregatedValue((byte[]) rawValue));
+    } else {
+      AvgPair mvResult = processMultiValueArray(rawValue);
+      value.apply(mvResult);
+    }
+    return value;
+  }
+
+  /**
+   * Processes a multi-value array and returns an AvgPair with the sum and 
count.
+   * The rawValue can be an Object[] array containing numeric values.
+   */
+  private AvgPair processMultiValueArray(Object rawValue) {
+    if (rawValue instanceof Object[]) {
+      Object[] values = (Object[]) rawValue;
+      AvgPair avgPair = new AvgPair();
+      for (Object value : values) {
+        if (value != null) {
+          avgPair.apply(ValueAggregatorUtils.toDouble(value));
+        }
+      }
+      return avgPair;
+    } else {
+      return new AvgPair(ValueAggregatorUtils.toDouble(rawValue), 1L);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountMVValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountMVValueAggregator.java
new file mode 100644
index 00000000000..7b26515fb18
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountMVValueAggregator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+/**
+ * Value aggregator for COUNTMV aggregation function.
+ * This aggregator handles multi-value columns by counting all values in all 
arrays.
+ */
+public class CountMVValueAggregator extends CountValueAggregator {
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.COUNTMV;
+  }
+
+  @Override
+  public Long getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return 0L;
+    }
+    return processMultiValueArray(rawValue);
+  }
+
+  @Override
+  public Long applyRawValue(Long value, Object rawValue) {
+    if (rawValue == null) {
+      return value;
+    }
+    return value + processMultiValueArray(rawValue);
+  }
+
+  /**
+   * Processes a multi-value array and returns the count of non-null values.
+   * The rawValue can be an Object[] array containing values.
+   */
+  private Long processMultiValueArray(Object rawValue) {
+    if (rawValue instanceof Object[]) {
+      Object[] values = (Object[]) rawValue;
+      long count = 0;
+      for (Object value : values) {
+        if (value != null) {
+          count++;
+        }
+      }
+      return count;
+    } else {
+      return 1L;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumMVValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumMVValueAggregator.java
new file mode 100644
index 00000000000..19d3a208c2a
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumMVValueAggregator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+/**
+ * Value aggregator for SUMMV aggregation function.
+ * This aggregator handles multi-value columns by summing all values in all 
arrays.
+ */
+public class SumMVValueAggregator extends SumValueAggregator {
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.SUMMV;
+  }
+
+  @Override
+  public Double getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return 0.0;
+    }
+    return processMultiValueArray(rawValue);
+  }
+
+  @Override
+  public Double applyRawValue(Double value, Object rawValue) {
+    if (rawValue == null) {
+      return value;
+    }
+    return value + processMultiValueArray(rawValue);
+  }
+
+  /**
+   * Processes a multi-value array and returns the sum of all values.
+   * The rawValue can be an Object[] array containing numeric values.
+   */
+  private Double processMultiValueArray(Object rawValue) {
+    if (rawValue instanceof Object[]) {
+      Object[] values = (Object[]) rawValue;
+      double sum = 0.0;
+      for (Object value : values) {
+        if (value != null) {
+          sum += ValueAggregatorUtils.toDouble(value);
+        }
+      }
+      return sum;
+    } else {
+      return ValueAggregatorUtils.toDouble(rawValue);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index 4ada0134954..6e0d9fac708 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -44,16 +44,22 @@ public class ValueAggregatorFactory {
     switch (aggregationType) {
       case COUNT:
         return new CountValueAggregator();
+      case COUNTMV:
+        return new CountMVValueAggregator();
       case MIN:
         return new MinValueAggregator();
       case MAX:
         return new MaxValueAggregator();
       case SUM:
         return new SumValueAggregator();
+      case SUMMV:
+        return new SumMVValueAggregator();
       case SUMPRECISION:
         return new SumPrecisionValueAggregator(arguments);
       case AVG:
         return new AvgValueAggregator();
+      case AVGMV:
+        return new AvgMVValueAggregator();
       case MINMAXRANGE:
         return new MinMaxRangeValueAggregator();
       case DISTINCTCOUNTBITMAP:
@@ -99,16 +105,22 @@ public class ValueAggregatorFactory {
     switch (aggregationType) {
       case COUNT:
         return CountValueAggregator.AGGREGATED_VALUE_TYPE;
+      case COUNTMV:
+        return CountMVValueAggregator.AGGREGATED_VALUE_TYPE;
       case MIN:
         return MinValueAggregator.AGGREGATED_VALUE_TYPE;
       case MAX:
         return MaxValueAggregator.AGGREGATED_VALUE_TYPE;
       case SUM:
         return SumValueAggregator.AGGREGATED_VALUE_TYPE;
+      case SUMMV:
+        return SumMVValueAggregator.AGGREGATED_VALUE_TYPE;
       case SUMPRECISION:
         return SumPrecisionValueAggregator.AGGREGATED_VALUE_TYPE;
       case AVG:
         return AvgValueAggregator.AGGREGATED_VALUE_TYPE;
+      case AVGMV:
+        return AvgMVValueAggregator.AGGREGATED_VALUE_TYPE;
       case MINMAXRANGE:
         return MinMaxRangeValueAggregator.AGGREGATED_VALUE_TYPE;
       case DISTINCTCOUNTBITMAP:
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index fea18b4c9b4..1656009ddc2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1281,7 +1282,7 @@ public final class TableConfigUtils {
   /// - All referenced columns exist in the schema and are single-valued
   private static void validateStarTreeIndexConfigs(List<StarTreeIndexConfig> 
starTreeIndexConfigs,
       Map<String, FieldIndexConfigs> indexConfigsMap, Schema schema) {
-    Set<String> referencedColumns = new HashSet<>();
+    Set<String> dimensionColumns = new HashSet<>();
     for (StarTreeIndexConfig starTreeIndexConfig : starTreeIndexConfigs) {
       // Validate dimension columns are dictionary encoded
       List<String> dimensionsSplitOrder = 
starTreeIndexConfig.getDimensionsSplitOrder();
@@ -1292,7 +1293,7 @@ public final class TableConfigUtils {
             "Failed to find dimension column: %s specified in star-tree index 
config in schema", dimension);
         
Preconditions.checkState(indexConfigs.getConfig(StandardIndexes.dictionary()).isEnabled(),
             "Cannot create star-tree index on dimension column: %s without 
dictionary", dimension);
-        referencedColumns.add(dimension);
+        dimensionColumns.add(dimension);
       }
 
       // Validate 'dimensionsSplitOrder' contains all dimensions in 
'skipStarNodeCreationForDimensions'
@@ -1312,6 +1313,7 @@ public final class TableConfigUtils {
           "Either 'functionColumnPairs' or 'aggregationConfigs' must be 
specified, but not both");
       Set<AggregationFunctionColumnPair> functionColumnPairsSet = new 
HashSet<>();
       Set<AggregationFunctionColumnPair> storedTypes = new HashSet<>();
+      Set<String> aggregatedColumns = new HashSet<>();
       if (functionColumnPairs != null) {
         for (String functionColumnPair : functionColumnPairs) {
           AggregationFunctionColumnPair columnPair;
@@ -1333,7 +1335,7 @@ public final class TableConfigUtils {
           }
           String column = columnPair.getColumn();
           if (!column.equals(AggregationFunctionColumnPair.STAR)) {
-            referencedColumns.add(column);
+              aggregatedColumns.add(column);
           } else if (columnPair.getFunctionType() != 
AggregationFunctionType.COUNT) {
             throw new IllegalStateException("Non-COUNT function set the column 
as '*' in the functionColumnPair: "
                 + functionColumnPair + ". Please configure an actual column 
for the function");
@@ -1362,7 +1364,7 @@ public final class TableConfigUtils {
           }
           String column = columnPair.getColumn();
           if (!column.equals(AggregationFunctionColumnPair.STAR)) {
-            referencedColumns.add(column);
+              aggregatedColumns.add(column);
           } else if (columnPair.getFunctionType() != 
AggregationFunctionType.COUNT) {
             throw new IllegalStateException("Non-COUNT function set the column 
as '*' in the aggregationConfig for "
                 + "function: " + aggregationConfig.getAggregationFunction()
@@ -1371,16 +1373,19 @@ public final class TableConfigUtils {
         }
       }
 
-      // Validate all referenced columns exist in the schema and are 
single-valued
-      for (String column : referencedColumns) {
+      for (String column : Iterables.concat(dimensionColumns, 
aggregatedColumns)) {
         FieldSpec fieldSpec = schema.getFieldSpecFor(column);
         Preconditions.checkState(fieldSpec != null,
             "Failed to find column: %s specified in star-tree index config in 
schema", column);
-        Preconditions.checkState(fieldSpec.isSingleValueField(),
-            "Star-tree index can only be created on single-value columns, but 
found multi-value column: %s", column);
         Preconditions.checkState(fieldSpec.getDataType() != DataType.MAP,
             "Star-tree index cannot be created on MAP column: %s", column);
       }
+
+      for (String column : dimensionColumns) {
+        FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+        Preconditions.checkState(fieldSpec.isSingleValueField(),
+            "Star-tree dimension columns must be single-value, but found 
multi-value column: %s", column);
+      }
     }
   }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
index d983ce259c7..f98fac81a8c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
@@ -41,12 +41,15 @@ public class ValueAggregatorTest {
   public static Object[][] fixedSizeAggregatedValue() {
     return new Object[][]{
         {AggregationFunctionType.COUNT, List.of(), true},
+        {AggregationFunctionType.COUNTMV, List.of(), true},
         {AggregationFunctionType.MIN, List.of(), true},
         {AggregationFunctionType.MAX, List.of(), true},
         {AggregationFunctionType.SUM, List.of(), true},
+        {AggregationFunctionType.SUMMV, List.of(), true},
         {AggregationFunctionType.SUMPRECISION, List.of(), false},
         {AggregationFunctionType.SUMPRECISION, 
List.of(ExpressionContext.forLiteral(Literal.intValue(20))), true},
         {AggregationFunctionType.AVG, List.of(), true},
+        {AggregationFunctionType.AVGMV, List.of(), true},
         {AggregationFunctionType.MINMAXRANGE, List.of(), true},
         {AggregationFunctionType.DISTINCTCOUNTBITMAP, List.of(), false},
         {AggregationFunctionType.DISTINCTCOUNTHLL, List.of(), true},
diff --git 
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
 
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
index 856719e0582..5558a95609a 100644
--- 
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
@@ -66,6 +66,17 @@
           "MAX__ArrDelay"
         ],
         "maxLeafRecords": 10
+      },
+      {
+        "dimensionsSplitOrder": [
+          "AirlineID"
+        ],
+        "functionColumnPairs": [
+          "AVGMV__DivLongestGTimes",
+          "COUNTMV__DivLongestGTimes",
+          "SUMMV__DivLongestGTimes"
+        ],
+        "maxLeafRecords": 10
       }
     ],
     "enableDynamicStarTreeCreation": true,
diff --git 
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
 
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
index 563e5caa196..703e3e27160 100644
--- 
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
+++ 
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
@@ -188,7 +188,7 @@
       "singleValueField": false
     },
     {
-      "dataType": "INT",
+      "dataType": "LONG",
       "name": "DivTotalGTimes",
       "singleValueField": false
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to