This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e258c7d0d62 Add MV support to LONG specific aggregation functions 
(#17007)
e258c7d0d62 is described below

commit e258c7d0d622c9ff7ed505762f2992ee18a2c4e6
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Oct 13 17:26:05 2025 -0700

    Add MV support to LONG specific aggregation functions (#17007)
---
 .../function/MaxLongAggregationFunction.java       | 127 ++++++++++++++++++--
 .../function/MinLongAggregationFunction.java       | 128 +++++++++++++++++++--
 .../function/SumLongAggregationFunction.java       | 121 ++++++++++++++++++-
 .../function/MaxLongAggregationFunctionTest.java   |  67 +++++++++++
 .../function/MinLongAggregationFunctionTest.java   |  74 ++++++++++++
 .../function/SumLongAggregationFunctionTest.java   |  76 ++++++++++++
 .../tests/OfflineClusterIntegrationTest.java       |  16 +++
 .../org/apache/pinot/query/type/TypeSystem.java    |   5 +
 .../pinot/query/QueryEnvironmentTestBase.java      |   8 +-
 .../pinot/segment/spi/AggregationFunctionType.java |  25 +++-
 10 files changed, 621 insertions(+), 26 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
index e01b0eeb22e..029d630546b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
@@ -68,17 +68,37 @@ public class MaxLongAggregationFunction extends 
NullableSingleInputAggregationFu
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
-    long[] values = blockValSet.getLongValuesSV();
 
-    Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
-      long innerMax = values[from];
-      for (int i = from; i < to; i++) {
-        innerMax = Math.max(innerMax, values[i]);
-      }
-      return acum == null ? innerMax : Math.max(acum, innerMax);
-    });
+    if (blockValSet.isSingleValue()) {
+      long[] values = blockValSet.getLongValuesSV();
+
+      Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+        long innerMax = values[from];
+        for (int i = from; i < to; i++) {
+          innerMax = Math.max(innerMax, values[i]);
+        }
+        return acum == null ? innerMax : Math.max(acum, innerMax);
+      });
+
+      updateAggregationResultHolder(aggregationResultHolder, max);
+    } else {
+      long[][] valuesArray = blockValSet.getLongValuesMV();
+
+      Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+        long innerMax = DEFAULT_INITIAL_VALUE;
+        for (int i = from; i < to; i++) {
+          long[] values = valuesArray[i];
+          for (long value : values) {
+            if (value > innerMax) {
+              innerMax = value;
+            }
+          }
+        }
+        return acum == null ? innerMax : Math.max(acum, innerMax);
+      });
 
-    updateAggregationResultHolder(aggregationResultHolder, max);
+      updateAggregationResultHolder(aggregationResultHolder, max);
+    }
   }
 
   protected void updateAggregationResultHolder(AggregationResultHolder 
aggregationResultHolder, Long max) {
@@ -97,6 +117,16 @@ public class MaxLongAggregationFunction extends 
NullableSingleInputAggregationFu
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.isSingleValue()) {
+      aggregateSvGroupBySV(blockValSet, length, groupKeyArray, 
groupByResultHolder);
+    } else {
+      aggregateMvGroupBySV(blockValSet, length, groupKeyArray, 
groupByResultHolder);
+    }
+  }
+
+  private void aggregateSvGroupBySV(BlockValSet blockValSet, int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder) {
     long[] valueArray = blockValSet.getLongValuesSV();
 
     if (_nullHandlingEnabled) {
@@ -121,10 +151,51 @@ public class MaxLongAggregationFunction extends 
NullableSingleInputAggregationFu
     }
   }
 
+  private void aggregateMvGroupBySV(BlockValSet blockValSet, int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder) {
+    long[][] valuesArray = blockValSet.getLongValuesMV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          int groupKey = groupKeyArray[i];
+          Long max = groupByResultHolder.getResult(groupKey);
+          for (long value : valuesArray[i]) {
+            if (max == null || value > max) {
+              max = value;
+            }
+          }
+          groupByResultHolder.setValueForKey(groupKey, max);
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        int groupKey = groupKeyArray[i];
+        long max = groupByResultHolder.getLongResult(groupKey);
+        for (long value : valuesArray[i]) {
+          if (value > max) {
+            max = value;
+          }
+        }
+        groupByResultHolder.setValueForKey(groupKey, max);
+      }
+    }
+  }
+
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.isSingleValue()) {
+      aggregateSvGroupByMV(blockValSet, length, groupKeysArray, 
groupByResultHolder);
+    } else {
+      aggregateMvGroupByMV(blockValSet, length, groupKeysArray, 
groupByResultHolder);
+    }
+  }
+
+  private void aggregateSvGroupByMV(BlockValSet blockValSet, int length, 
int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder) {
     long[] valueArray = blockValSet.getLongValuesSV();
 
     if (_nullHandlingEnabled) {
@@ -151,6 +222,44 @@ public class MaxLongAggregationFunction extends 
NullableSingleInputAggregationFu
     }
   }
 
+  private void aggregateMvGroupByMV(BlockValSet blockValSet, int length, 
int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder) {
+    long[][] valuesArray = blockValSet.getLongValuesMV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          Long max = null;
+          for (long value : valuesArray[i]) {
+            if (max == null || value > max) {
+              max = value;
+            }
+          }
+
+          for (int groupKey : groupKeysArray[i]) {
+            Long currentMax = groupByResultHolder.getResult(groupKey);
+            if (currentMax == null || (max != null && max > currentMax)) {
+              groupByResultHolder.setValueForKey(groupKey, max);
+            }
+          }
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        long[] values = valuesArray[i];
+        for (int groupKey : groupKeysArray[i]) {
+          long max = groupByResultHolder.getLongResult(groupKey);
+          for (long value : values) {
+            if (value > max) {
+              max = value;
+            }
+          }
+          groupByResultHolder.setValueForKey(groupKey, max);
+        }
+      }
+    }
+  }
+
   @Override
   public Long extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     if (_nullHandlingEnabled) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
index c48d222abf8..159e543468a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
@@ -68,17 +68,37 @@ public class MinLongAggregationFunction extends 
NullableSingleInputAggregationFu
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
-    long[] values = blockValSet.getLongValuesSV();
 
-    Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
-      long innerMin = values[from];
-      for (int i = from; i < to; i++) {
-        innerMin = Math.min(innerMin, values[i]);
-      }
-      return acum == null ? innerMin : Math.min(acum, innerMin);
-    });
+    if (blockValSet.isSingleValue()) {
+      long[] values = blockValSet.getLongValuesSV();
+
+      Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+        long innerMin = values[from];
+        for (int i = from; i < to; i++) {
+          innerMin = Math.min(innerMin, values[i]);
+        }
+        return acum == null ? innerMin : Math.min(acum, innerMin);
+      });
 
-    updateAggregationResultHolder(aggregationResultHolder, min);
+      updateAggregationResultHolder(aggregationResultHolder, min);
+    } else {
+      long[][] valuesArray = blockValSet.getLongValuesMV();
+
+      Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+        long innerMin = DEFAULT_VALUE;
+        for (int i = from; i < to; i++) {
+          long[] values = valuesArray[i];
+          for (long value : values) {
+            if (value < innerMin) {
+              innerMin = value;
+            }
+          }
+        }
+        return acum == null ? innerMin : Math.min(acum, innerMin);
+      });
+
+      updateAggregationResultHolder(aggregationResultHolder, min);
+    }
   }
 
   protected void updateAggregationResultHolder(AggregationResultHolder 
aggregationResultHolder, Long min) {
@@ -97,8 +117,17 @@ public class MinLongAggregationFunction extends 
NullableSingleInputAggregationFu
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
-    long[] valueArray = blockValSet.getLongValuesSV();
 
+    if (blockValSet.isSingleValue()) {
+      aggregateSvGroupBySv(blockValSet, length, groupKeyArray, 
groupByResultHolder);
+    } else {
+      aggregateMvGroupBySv(blockValSet, length, groupKeyArray, 
groupByResultHolder);
+    }
+  }
+
+  private void aggregateSvGroupBySv(BlockValSet blockValSet, int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder) {
+    long[] valueArray = blockValSet.getLongValuesSV();
     if (_nullHandlingEnabled) {
       forEachNotNull(length, blockValSet, (from, to) -> {
         for (int i = from; i < to; i++) {
@@ -121,10 +150,51 @@ public class MinLongAggregationFunction extends 
NullableSingleInputAggregationFu
     }
   }
 
+  private void aggregateMvGroupBySv(BlockValSet blockValSet, int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder) {
+    long[][] valuesArray = blockValSet.getLongValuesMV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          int groupKey = groupKeyArray[i];
+          Long min = groupByResultHolder.getResult(groupKey);
+          for (long value : valuesArray[i]) {
+            if (min == null || value < min) {
+              min = value;
+            }
+          }
+          groupByResultHolder.setValueForKey(groupKey, min);
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        int groupKey = groupKeyArray[i];
+        long min = groupByResultHolder.getLongResult(groupKey);
+        for (long value : valuesArray[i]) {
+          if (value < min) {
+            min = value;
+          }
+        }
+        groupByResultHolder.setValueForKey(groupKey, min);
+      }
+    }
+  }
+
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.isSingleValue()) {
+      aggregateSvGroupByMv(blockValSet, length, groupKeysArray, 
groupByResultHolder);
+    } else {
+      aggregateMvGroupByMv(blockValSet, length, groupKeysArray, 
groupByResultHolder);
+    }
+  }
+
+  private void aggregateSvGroupByMv(BlockValSet blockValSet, int length, 
int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder) {
     long[] valueArray = blockValSet.getLongValuesSV();
 
     if (_nullHandlingEnabled) {
@@ -151,6 +221,44 @@ public class MinLongAggregationFunction extends 
NullableSingleInputAggregationFu
     }
   }
 
+  private void aggregateMvGroupByMv(BlockValSet blockValSet, int length, 
int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder) {
+    long[][] valuesArray = blockValSet.getLongValuesMV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          Long min = null;
+          for (long value : valuesArray[i]) {
+            if (min == null || value < min) {
+              min = value;
+            }
+          }
+
+          for (int groupKey : groupKeysArray[i]) {
+            Long currentMin = groupByResultHolder.getResult(groupKey);
+            if (currentMin == null || (min != null && min < currentMin)) {
+              groupByResultHolder.setValueForKey(groupKey, min);
+            }
+          }
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        long[] values = valuesArray[i];
+        for (int groupKey : groupKeysArray[i]) {
+          long min = groupByResultHolder.getLongResult(groupKey);
+          for (long value : values) {
+            if (value < min) {
+              min = value;
+            }
+          }
+          groupByResultHolder.setValueForKey(groupKey, min);
+        }
+      }
+    }
+  }
+
   @Override
   public Long extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     if (_nullHandlingEnabled) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
index 5f142acad0a..61b0b223079 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.request.context.ExpressionContext;
@@ -79,11 +80,18 @@ public class SumLongAggregationFunction extends 
NullableSingleInputAggregationFu
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+    Preconditions.checkArgument(blockValSet.getValueType().getStoredType() == 
DataType.LONG
+            || blockValSet.getValueType().getStoredType() == DataType.INT,
+        "SumLongAggregationFunction only supports integer type columns");
 
-    if (blockValSet.getValueType().getStoredType() != DataType.LONG) {
-      throw new IllegalArgumentException("SumLongAggregationFunction only 
supports LONG columns");
+    if (blockValSet.isSingleValue()) {
+      aggregateSv(blockValSet, length, aggregationResultHolder);
+    } else {
+      aggregateMv(blockValSet, length, aggregationResultHolder);
     }
+  }
 
+  private void aggregateSv(BlockValSet blockValSet, int length, 
AggregationResultHolder aggregationResultHolder) {
     long[] values = blockValSet.getLongValuesSV();
 
     // Use foldNotNull with null as initial value - this will return null if 
no non-null values are processed
@@ -98,6 +106,23 @@ public class SumLongAggregationFunction extends 
NullableSingleInputAggregationFu
     updateAggregationResultHolder(aggregationResultHolder, sum);
   }
 
+  private void aggregateMv(BlockValSet blockValSet, int length, 
AggregationResultHolder aggregationResultHolder) {
+    long[][] valuesArray = blockValSet.getLongValuesMV();
+
+    Long sum;
+    sum = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+      long innerSum = 0;
+      for (int i = from; i < to; i++) {
+        for (long value : valuesArray[i]) {
+          innerSum += value;
+        }
+      }
+      return acum == null ? innerSum : acum + innerSum;
+    });
+
+    updateAggregationResultHolder(aggregationResultHolder, sum);
+  }
+
   private void updateAggregationResultHolder(AggregationResultHolder 
aggregationResultHolder, Long sum) {
     if (sum != null) {
       if (_nullHandlingEnabled) {
@@ -114,6 +139,19 @@ public class SumLongAggregationFunction extends 
NullableSingleInputAggregationFu
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+    Preconditions.checkArgument(blockValSet.getValueType().getStoredType() == 
DataType.LONG
+            || blockValSet.getValueType().getStoredType() == DataType.INT,
+        "SumLongAggregationFunction only supports integer type columns");
+
+    if (blockValSet.isSingleValue()) {
+      aggregateSvGroupBySv(blockValSet, length, groupKeyArray, 
groupByResultHolder);
+    } else {
+      aggregateMvGroupBySv(blockValSet, length, groupKeyArray, 
groupByResultHolder);
+    }
+  }
+
+  private void aggregateSvGroupBySv(BlockValSet blockValSet, int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder) {
     long[] values = blockValSet.getLongValuesSV();
 
     if (_nullHandlingEnabled) {
@@ -135,10 +173,54 @@ public class SumLongAggregationFunction extends 
NullableSingleInputAggregationFu
     }
   }
 
+  private void aggregateMvGroupBySv(BlockValSet blockValSet, int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder) {
+    long[][] valuesArray = blockValSet.getLongValuesMV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          int groupKey = groupKeyArray[i];
+          if (valuesArray[i].length > 0) {
+            // "i" has to be non-null here so we can use the default value as 
the initial value instead of null
+            long sum = DEFAULT_VALUE;
+            for (long value : valuesArray[i]) {
+              sum += value;
+            }
+            Long result = groupByResultHolder.getResult(groupKey);
+            groupByResultHolder.setValueForKey(groupKey, result == null ? sum 
: result + sum);
+          }
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        int groupKey = groupKeyArray[i];
+        long sum = groupByResultHolder.getLongResult(groupKey);
+        for (long value : valuesArray[i]) {
+          sum += value;
+        }
+        groupByResultHolder.setValueForKey(groupKey, sum);
+      }
+    }
+  }
+
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+    Preconditions.checkArgument(blockValSet.getValueType().getStoredType() == 
DataType.LONG
+            || blockValSet.getValueType().getStoredType() == DataType.INT,
+        "SumLongAggregationFunction only supports integer type columns");
+
+    if (blockValSet.isSingleValue()) {
+      aggregateSvGroupByMv(blockValSet, length, groupKeysArray, 
groupByResultHolder);
+    } else {
+      aggregateMvGroupByMv(blockValSet, length, groupKeysArray, 
groupByResultHolder);
+    }
+  }
+
+  private void aggregateSvGroupByMv(BlockValSet blockValSet, int length, 
int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder) {
     long[] values = blockValSet.getLongValuesSV();
 
     if (_nullHandlingEnabled) {
@@ -163,6 +245,41 @@ public class SumLongAggregationFunction extends 
NullableSingleInputAggregationFu
     }
   }
 
+  private void aggregateMvGroupByMv(BlockValSet blockValSet, int length, 
int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder) {
+    long[][] valuesArray = blockValSet.getLongValuesMV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          long[] values = valuesArray[i];
+          if (values.length > 0) {
+            // "i" has to be non-null here so we can use the default value as 
the initial value instead of null
+            long sum = DEFAULT_VALUE;
+            for (long value : values) {
+              sum += value;
+            }
+            for (int groupKey : groupKeysArray[i]) {
+              Long result = groupByResultHolder.getResult(groupKey);
+              groupByResultHolder.setValueForKey(groupKey, result == null ? 
sum : result + sum);
+            }
+          }
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        long[] values = valuesArray[i];
+        for (int groupKey : groupKeysArray[i]) {
+          long sum = groupByResultHolder.getLongResult(groupKey);
+          for (long value : values) {
+            sum += value;
+          }
+          groupByResultHolder.setValueForKey(groupKey, sum);
+        }
+      }
+    }
+  }
+
   @Override
   public Long extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     if (_nullHandlingEnabled) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
index fb0282e5c21..5bcbdc00ebd 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
@@ -193,4 +193,71 @@ public class MaxLongAggregationFunctionTest extends 
AbstractAggregationFunctionT
         ).whenQuery("select maxlong(myField) from testTable")
         .thenResultIs("LONG", "" + (Long.MAX_VALUE - 1));
   }
+
+  @Test
+  public void aggregationMV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"1;2;3"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null"}
+        )
+        .whenQuery("select maxlong(mv) from testTable")
+        .thenResultIs("LONG", "3")
+        .whenQueryWithNullHandlingEnabled("select maxlong(mv) from testTable")
+        .thenResultIs("LONG", "3");
+  }
+
+  @Test
+  public void aggregationMVGroupBySV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+                .addSingleValueDimension("sv", FieldSpec.DataType.STRING)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"null", "k1"},
+            new Object[]{"1;2;3", "k2"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null", "k2"},
+            new Object[]{"1;2;3", "k1"}
+        )
+        .whenQuery("select maxlong(mv) from testTable group by sv")
+        .thenResultIs("LONG", "3", "3")
+        .whenQueryWithNullHandlingEnabled("select maxlong(mv) from testTable 
group by sv")
+        .thenResultIs("LONG", "3", "3");
+  }
+
+  @Test
+  public void aggregationMVGroupByMV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv1", FieldSpec.DataType.LONG)
+                .addMultiValueDimension("mv2", FieldSpec.DataType.STRING)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"1;2", "k1;k2"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null", "k1;k2"}
+        )
+        .whenQuery("select maxlong(mv1) from testTable group by mv2")
+        .thenResultIs("LONG", "2", "2")
+        .whenQueryWithNullHandlingEnabled("select maxlong(mv1) from testTable 
group by mv2")
+        .thenResultIs("LONG", "2", "2");
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
index cea3320314a..430c3e4e2a8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
@@ -195,4 +195,78 @@ public class MinLongAggregationFunctionTest extends 
AbstractAggregationFunctionT
         ).whenQuery("select minlong(myField) from testTable")
         .thenResultIs("LONG", "" + (Long.MAX_VALUE - 5));
   }
+
+  @Test
+  public void aggregationMV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"1;2;3"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null"}
+        )
+        .whenQuery("select minlong(mv) from testTable")
+        .thenResultIs("LONG",
+            
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG, null)))
+        .whenQueryWithNullHandlingEnabled("select minlong(mv) from testTable")
+        .thenResultIs("LONG", "1");
+  }
+
+  @Test
+  public void aggregationMVGroupBySV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+                .addSingleValueDimension("sv", FieldSpec.DataType.STRING)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"null", "k1"},
+            new Object[]{"1;2;3", "k2"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null", "k2"},
+            new Object[]{"1;2;3", "k1"}
+        )
+        .whenQuery("select minlong(mv) from testTable group by sv")
+        .thenResultIs("LONG", String.valueOf(
+            ((Number) 
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG,
+                null)).longValue()), String.valueOf(
+            ((Number) 
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG,
+                null)).longValue()))
+        .whenQueryWithNullHandlingEnabled("select minlong(mv) from testTable 
group by sv")
+        .thenResultIs("LONG", "1", "1");
+  }
+
+  @Test
+  public void aggregationMVGroupByMV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv1", FieldSpec.DataType.LONG)
+                .addMultiValueDimension("mv2", FieldSpec.DataType.STRING)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"1;2", "k1;k2"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null", "k1;k2"}
+        )
+        .whenQuery("select minlong(mv1) from testTable group by mv2")
+        .thenResultIs("LONG",
+            
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG, null)),
+            
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG, null)))
+        .whenQueryWithNullHandlingEnabled("select minlong(mv1) from testTable 
group by mv2")
+        .thenResultIs("LONG", "1", "1");
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
index 5bc868b225c..dd6bf22d4e5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
@@ -176,4 +176,80 @@ public class SumLongAggregationFunctionTest extends 
AbstractAggregationFunctionT
             "tag3    | null"
         );
   }
+
+  @Test
+  public void aggregationMV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"1;2;3"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null"}
+        )
+        .whenQuery("select sumlong(mv) from testTable")
+        .thenResultIs("LONG", String.valueOf(
+            1 + 2 + 3 + (long) 
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG,
+                null)))
+        .whenQueryWithNullHandlingEnabled("select sumlong(mv) from testTable")
+        .thenResultIs("LONG",
+            String.valueOf(1 + 2 + 3));
+  }
+
+  @Test
+  public void aggregationMVGroupBySV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+                .addSingleValueDimension("sv", FieldSpec.DataType.STRING)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"null", "k1"},
+            new Object[]{"1;2;3", "k2"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null", "k2"},
+            new Object[]{"1;2;3", "k1"}
+        )
+        .whenQuery("select sumlong(mv) from testTable group by sv")
+        .thenResultIs("LONG", String.valueOf(
+                6 + (long) 
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG, null)),
+            String.valueOf(
+                6 + (long) 
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG, null)))
+        .whenQueryWithNullHandlingEnabled("select sumlong(mv) from testTable 
group by sv")
+        .thenResultIs("LONG", "6", "6");
+  }
+
+  @Test
+  public void aggregationMVGroupByMV() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("mv1", FieldSpec.DataType.LONG)
+                .addMultiValueDimension("mv2", FieldSpec.DataType.STRING)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"1;2", "k1;k2"}
+        )
+        .andOnSecondInstance(
+            new Object[]{"null", "k1;k2"}
+        )
+        .whenQuery("select sumlong(mv1) from testTable group by mv2")
+        .thenResultIs("LONG", String.valueOf(
+                3 + (long) 
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG, null)),
+            String.valueOf(
+                3 + (long) 
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
FieldSpec.DataType.LONG, null)))
+        .whenQueryWithNullHandlingEnabled("select sumlong(mv1) from testTable 
group by mv2")
+        .thenResultIs("LONG", "3", "3");
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index c019e5c3aa5..d32ed0290ca 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3070,6 +3070,22 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testMvLongAggregations(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = "SELECT sumlong(DivTotalGTimes), minlong(DivTotalGTimes), 
maxlong(DivTotalGTimes) FROM mytable";
+    JsonNode response = postQuery(query);
+    assertNoError(response);
+    JsonNode resultTable = response.get("resultTable");
+    JsonNode dataSchema = resultTable.get("dataSchema");
+    assertEquals(dataSchema.get("columnDataTypes").toString(), 
"[\"LONG\",\"LONG\",\"LONG\"]");
+    JsonNode rows = resultTable.get("rows");
+    assertEquals(rows.size(), 1);
+    JsonNode row = rows.get(0);
+    assertEquals(row.size(), 3);
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
index b40f913c4e8..8f479b19c7b 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
@@ -109,6 +109,11 @@ public class TypeSystem extends RelDataTypeSystemImpl {
 
   @Override
   public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType 
argumentType) {
+    if (argumentType.getComponentType() != null) {
+      // For MV columns, the return type for SUM is the same as the return 
type for SUM on the individual element type.
+      return deriveSumType(typeFactory, argumentType.getComponentType());
+    }
+
     assert SqlTypeUtil.isNumeric(argumentType);
     switch (argumentType.getSqlTypeName()) {
       case TINYINT:
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 2d4bdd9a41f..7a0d8c23028 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -79,7 +79,9 @@ public class QueryEnvironmentTestBase {
     TABLE_SCHEMAS.put("c_OFFLINE", getSchemaBuilder("c").build());
     TABLE_SCHEMAS.put("d", getSchemaBuilder("d").build());
     TABLE_SCHEMAS.put("e", getSchemaBuilder("e")
-        .addMultiValueDimension("mcol1", FieldSpec.DataType.STRING).build());
+        .addMultiValueDimension("mcol1", FieldSpec.DataType.STRING)
+        .addMultiValueDimension("mcol2", FieldSpec.DataType.LONG)
+        .build());
   }
 
   static Schema.SchemaBuilder getSchemaBuilder(String schemaName) {
@@ -286,7 +288,9 @@ public class QueryEnvironmentTestBase {
         // Verify type coercion in standard functions
         new Object[]{"SELECT DATEADD('DAY', 1, col7) FROM a"},
         new Object[]{"SELECT TIMESTAMPADD(DAY, 10, NOW() - 100) FROM a"},
-        new Object[]{"SELECT ts FROM a WHERE ts <= '2025-08-14 
00:00:00.000000'"}
+        new Object[]{"SELECT ts FROM a WHERE ts <= '2025-08-14 
00:00:00.000000'"},
+        // Aggregations on MV LONG columns
+        new Object[]{"SELECT SUMLONG(mcol2), MINLONG(mcol2), MAXLONG(mcol2) 
FROM e"}
     };
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 614d8650749..93f6b2755fd 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -55,12 +55,12 @@ public enum AggregationFunctionType {
   MAX("max", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
   MINSTRING("minString", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, 
OperandTypes.CHARACTER),
   MAXSTRING("maxString", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, 
OperandTypes.CHARACTER),
-  MINLONG("minLong", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, OperandTypes.INTEGER),
-  MAXLONG("maxLong", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, OperandTypes.INTEGER),
+  MINLONG("minLong", new BigintNullableIfEmpty(), 
OperandTypes.or(OperandTypes.INTEGER, OperandTypes.ARRAY_OF_INTEGER)),
+  MAXLONG("maxLong", new BigintNullableIfEmpty(), 
OperandTypes.or(OperandTypes.INTEGER, OperandTypes.ARRAY_OF_INTEGER)),
   SUM("sum", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
   SUM0("$sum0", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
   SUMINT("sumInt", ReturnTypes.AGG_SUM, OperandTypes.INTEGER),
-  SUMLONG("sumLong", ReturnTypes.AGG_SUM, OperandTypes.INTEGER),
+  SUMLONG("sumLong", ReturnTypes.AGG_SUM, 
OperandTypes.or(OperandTypes.INTEGER, OperandTypes.ARRAY_OF_INTEGER)),
   SUMPRECISION("sumPrecision", ReturnTypes.explicit(SqlTypeName.DECIMAL), 
OperandTypes.ANY, SqlTypeName.OTHER),
   AVG("avg", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
   MODE("mode", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
@@ -400,4 +400,23 @@ public enum AggregationFunctionType {
       return typeFactory.createArrayType(elementType, -1);
     }
   }
+
+  // Used for aggregation functions that always return BIGINT. The "IfEmpty" 
logic ensures that the return type is
+  // nullable for pure aggregation queries (no group-by) and filtered 
aggregation queries. Return values can be null
+  // if there are no matching rows (even if the operand type is not nullable).
+  private static class BigintNullableIfEmpty implements SqlReturnTypeInference 
{
+    @Override
+    public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+      RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+      if (opBinding.getGroupCount() == 0 || opBinding.hasFilter()) {
+        return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT),
 true);
+      } else {
+        if (opBinding.getOperandType(0).isNullable()) {
+          return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT),
 true);
+        } else {
+          return typeFactory.createSqlType(SqlTypeName.BIGINT);
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to