This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e258c7d0d62 Add MV support to LONG specific aggregation functions
(#17007)
e258c7d0d62 is described below
commit e258c7d0d622c9ff7ed505762f2992ee18a2c4e6
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Oct 13 17:26:05 2025 -0700
Add MV support to LONG specific aggregation functions (#17007)
---
.../function/MaxLongAggregationFunction.java | 127 ++++++++++++++++++--
.../function/MinLongAggregationFunction.java | 128 +++++++++++++++++++--
.../function/SumLongAggregationFunction.java | 121 ++++++++++++++++++-
.../function/MaxLongAggregationFunctionTest.java | 67 +++++++++++
.../function/MinLongAggregationFunctionTest.java | 74 ++++++++++++
.../function/SumLongAggregationFunctionTest.java | 76 ++++++++++++
.../tests/OfflineClusterIntegrationTest.java | 16 +++
.../org/apache/pinot/query/type/TypeSystem.java | 5 +
.../pinot/query/QueryEnvironmentTestBase.java | 8 +-
.../pinot/segment/spi/AggregationFunctionType.java | 25 +++-
10 files changed, 621 insertions(+), 26 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
index e01b0eeb22e..029d630546b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
@@ -68,17 +68,37 @@ public class MaxLongAggregationFunction extends
NullableSingleInputAggregationFu
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- long[] values = blockValSet.getLongValuesSV();
- Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
- long innerMax = values[from];
- for (int i = from; i < to; i++) {
- innerMax = Math.max(innerMax, values[i]);
- }
- return acum == null ? innerMax : Math.max(acum, innerMax);
- });
+ if (blockValSet.isSingleValue()) {
+ long[] values = blockValSet.getLongValuesSV();
+
+ Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+ long innerMax = values[from];
+ for (int i = from; i < to; i++) {
+ innerMax = Math.max(innerMax, values[i]);
+ }
+ return acum == null ? innerMax : Math.max(acum, innerMax);
+ });
+
+ updateAggregationResultHolder(aggregationResultHolder, max);
+ } else {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+ long innerMax = DEFAULT_INITIAL_VALUE;
+ for (int i = from; i < to; i++) {
+ long[] values = valuesArray[i];
+ for (long value : values) {
+ if (value > innerMax) {
+ innerMax = value;
+ }
+ }
+ }
+ return acum == null ? innerMax : Math.max(acum, innerMax);
+ });
- updateAggregationResultHolder(aggregationResultHolder, max);
+ updateAggregationResultHolder(aggregationResultHolder, max);
+ }
}
protected void updateAggregationResultHolder(AggregationResultHolder
aggregationResultHolder, Long max) {
@@ -97,6 +117,16 @@ public class MaxLongAggregationFunction extends
NullableSingleInputAggregationFu
public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.isSingleValue()) {
+ aggregateSvGroupBySV(blockValSet, length, groupKeyArray,
groupByResultHolder);
+ } else {
+ aggregateMvGroupBySV(blockValSet, length, groupKeyArray,
groupByResultHolder);
+ }
+ }
+
+ private void aggregateSvGroupBySV(BlockValSet blockValSet, int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder) {
long[] valueArray = blockValSet.getLongValuesSV();
if (_nullHandlingEnabled) {
@@ -121,10 +151,51 @@ public class MaxLongAggregationFunction extends
NullableSingleInputAggregationFu
}
}
+ private void aggregateMvGroupBySV(BlockValSet blockValSet, int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder) {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ int groupKey = groupKeyArray[i];
+ Long max = groupByResultHolder.getResult(groupKey);
+ for (long value : valuesArray[i]) {
+ if (max == null || value > max) {
+ max = value;
+ }
+ }
+ groupByResultHolder.setValueForKey(groupKey, max);
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ int groupKey = groupKeyArray[i];
+ long max = groupByResultHolder.getLongResult(groupKey);
+ for (long value : valuesArray[i]) {
+ if (value > max) {
+ max = value;
+ }
+ }
+ groupByResultHolder.setValueForKey(groupKey, max);
+ }
+ }
+ }
+
@Override
public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.isSingleValue()) {
+ aggregateSvGroupByMV(blockValSet, length, groupKeysArray,
groupByResultHolder);
+ } else {
+ aggregateMvGroupByMV(blockValSet, length, groupKeysArray,
groupByResultHolder);
+ }
+ }
+
+ private void aggregateSvGroupByMV(BlockValSet blockValSet, int length,
int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder) {
long[] valueArray = blockValSet.getLongValuesSV();
if (_nullHandlingEnabled) {
@@ -151,6 +222,44 @@ public class MaxLongAggregationFunction extends
NullableSingleInputAggregationFu
}
}
+ private void aggregateMvGroupByMV(BlockValSet blockValSet, int length,
int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder) {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ Long max = null;
+ for (long value : valuesArray[i]) {
+ if (max == null || value > max) {
+ max = value;
+ }
+ }
+
+ for (int groupKey : groupKeysArray[i]) {
+ Long currentMax = groupByResultHolder.getResult(groupKey);
+ if (currentMax == null || (max != null && max > currentMax)) {
+ groupByResultHolder.setValueForKey(groupKey, max);
+ }
+ }
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ long[] values = valuesArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ long max = groupByResultHolder.getLongResult(groupKey);
+ for (long value : values) {
+ if (value > max) {
+ max = value;
+ }
+ }
+ groupByResultHolder.setValueForKey(groupKey, max);
+ }
+ }
+ }
+ }
+
@Override
public Long extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
if (_nullHandlingEnabled) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
index c48d222abf8..159e543468a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
@@ -68,17 +68,37 @@ public class MinLongAggregationFunction extends
NullableSingleInputAggregationFu
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- long[] values = blockValSet.getLongValuesSV();
- Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
- long innerMin = values[from];
- for (int i = from; i < to; i++) {
- innerMin = Math.min(innerMin, values[i]);
- }
- return acum == null ? innerMin : Math.min(acum, innerMin);
- });
+ if (blockValSet.isSingleValue()) {
+ long[] values = blockValSet.getLongValuesSV();
+
+ Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+ long innerMin = values[from];
+ for (int i = from; i < to; i++) {
+ innerMin = Math.min(innerMin, values[i]);
+ }
+ return acum == null ? innerMin : Math.min(acum, innerMin);
+ });
- updateAggregationResultHolder(aggregationResultHolder, min);
+ updateAggregationResultHolder(aggregationResultHolder, min);
+ } else {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+ long innerMin = DEFAULT_VALUE;
+ for (int i = from; i < to; i++) {
+ long[] values = valuesArray[i];
+ for (long value : values) {
+ if (value < innerMin) {
+ innerMin = value;
+ }
+ }
+ }
+ return acum == null ? innerMin : Math.min(acum, innerMin);
+ });
+
+ updateAggregationResultHolder(aggregationResultHolder, min);
+ }
}
protected void updateAggregationResultHolder(AggregationResultHolder
aggregationResultHolder, Long min) {
@@ -97,8 +117,17 @@ public class MinLongAggregationFunction extends
NullableSingleInputAggregationFu
public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- long[] valueArray = blockValSet.getLongValuesSV();
+ if (blockValSet.isSingleValue()) {
+ aggregateSvGroupBySv(blockValSet, length, groupKeyArray,
groupByResultHolder);
+ } else {
+ aggregateMvGroupBySv(blockValSet, length, groupKeyArray,
groupByResultHolder);
+ }
+ }
+
+ private void aggregateSvGroupBySv(BlockValSet blockValSet, int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder) {
+ long[] valueArray = blockValSet.getLongValuesSV();
if (_nullHandlingEnabled) {
forEachNotNull(length, blockValSet, (from, to) -> {
for (int i = from; i < to; i++) {
@@ -121,10 +150,51 @@ public class MinLongAggregationFunction extends
NullableSingleInputAggregationFu
}
}
+ private void aggregateMvGroupBySv(BlockValSet blockValSet, int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder) {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ int groupKey = groupKeyArray[i];
+ Long min = groupByResultHolder.getResult(groupKey);
+ for (long value : valuesArray[i]) {
+ if (min == null || value < min) {
+ min = value;
+ }
+ }
+ groupByResultHolder.setValueForKey(groupKey, min);
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ int groupKey = groupKeyArray[i];
+ long min = groupByResultHolder.getLongResult(groupKey);
+ for (long value : valuesArray[i]) {
+ if (value < min) {
+ min = value;
+ }
+ }
+ groupByResultHolder.setValueForKey(groupKey, min);
+ }
+ }
+ }
+
@Override
public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.isSingleValue()) {
+ aggregateSvGroupByMv(blockValSet, length, groupKeysArray,
groupByResultHolder);
+ } else {
+ aggregateMvGroupByMv(blockValSet, length, groupKeysArray,
groupByResultHolder);
+ }
+ }
+
+ private void aggregateSvGroupByMv(BlockValSet blockValSet, int length,
int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder) {
long[] valueArray = blockValSet.getLongValuesSV();
if (_nullHandlingEnabled) {
@@ -151,6 +221,44 @@ public class MinLongAggregationFunction extends
NullableSingleInputAggregationFu
}
}
+ private void aggregateMvGroupByMv(BlockValSet blockValSet, int length,
int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder) {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ Long min = null;
+ for (long value : valuesArray[i]) {
+ if (min == null || value < min) {
+ min = value;
+ }
+ }
+
+ for (int groupKey : groupKeysArray[i]) {
+ Long currentMin = groupByResultHolder.getResult(groupKey);
+ if (currentMin == null || (min != null && min < currentMin)) {
+ groupByResultHolder.setValueForKey(groupKey, min);
+ }
+ }
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ long[] values = valuesArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ long min = groupByResultHolder.getLongResult(groupKey);
+ for (long value : values) {
+ if (value < min) {
+ min = value;
+ }
+ }
+ groupByResultHolder.setValueForKey(groupKey, min);
+ }
+ }
+ }
+ }
+
@Override
public Long extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
if (_nullHandlingEnabled) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
index 5f142acad0a..61b0b223079 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
@@ -79,11 +80,18 @@ public class SumLongAggregationFunction extends
NullableSingleInputAggregationFu
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+ Preconditions.checkArgument(blockValSet.getValueType().getStoredType() ==
DataType.LONG
+ || blockValSet.getValueType().getStoredType() == DataType.INT,
+ "SumLongAggregationFunction only supports integer type columns");
- if (blockValSet.getValueType().getStoredType() != DataType.LONG) {
- throw new IllegalArgumentException("SumLongAggregationFunction only
supports LONG columns");
+ if (blockValSet.isSingleValue()) {
+ aggregateSv(blockValSet, length, aggregationResultHolder);
+ } else {
+ aggregateMv(blockValSet, length, aggregationResultHolder);
}
+ }
+ private void aggregateSv(BlockValSet blockValSet, int length,
AggregationResultHolder aggregationResultHolder) {
long[] values = blockValSet.getLongValuesSV();
// Use foldNotNull with null as initial value - this will return null if
no non-null values are processed
@@ -98,6 +106,23 @@ public class SumLongAggregationFunction extends
NullableSingleInputAggregationFu
updateAggregationResultHolder(aggregationResultHolder, sum);
}
+ private void aggregateMv(BlockValSet blockValSet, int length,
AggregationResultHolder aggregationResultHolder) {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ Long sum;
+ sum = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+ long innerSum = 0;
+ for (int i = from; i < to; i++) {
+ for (long value : valuesArray[i]) {
+ innerSum += value;
+ }
+ }
+ return acum == null ? innerSum : acum + innerSum;
+ });
+
+ updateAggregationResultHolder(aggregationResultHolder, sum);
+ }
+
private void updateAggregationResultHolder(AggregationResultHolder
aggregationResultHolder, Long sum) {
if (sum != null) {
if (_nullHandlingEnabled) {
@@ -114,6 +139,19 @@ public class SumLongAggregationFunction extends
NullableSingleInputAggregationFu
public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+ Preconditions.checkArgument(blockValSet.getValueType().getStoredType() ==
DataType.LONG
+ || blockValSet.getValueType().getStoredType() == DataType.INT,
+ "SumLongAggregationFunction only supports integer type columns");
+
+ if (blockValSet.isSingleValue()) {
+ aggregateSvGroupBySv(blockValSet, length, groupKeyArray,
groupByResultHolder);
+ } else {
+ aggregateMvGroupBySv(blockValSet, length, groupKeyArray,
groupByResultHolder);
+ }
+ }
+
+ private void aggregateSvGroupBySv(BlockValSet blockValSet, int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder) {
long[] values = blockValSet.getLongValuesSV();
if (_nullHandlingEnabled) {
@@ -135,10 +173,54 @@ public class SumLongAggregationFunction extends
NullableSingleInputAggregationFu
}
}
+ private void aggregateMvGroupBySv(BlockValSet blockValSet, int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder) {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ int groupKey = groupKeyArray[i];
+ if (valuesArray[i].length > 0) {
+ // "i" has to be non-null here so we can use the default value as
the initial value instead of null
+ long sum = DEFAULT_VALUE;
+ for (long value : valuesArray[i]) {
+ sum += value;
+ }
+ Long result = groupByResultHolder.getResult(groupKey);
+ groupByResultHolder.setValueForKey(groupKey, result == null ? sum
: result + sum);
+ }
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ int groupKey = groupKeyArray[i];
+ long sum = groupByResultHolder.getLongResult(groupKey);
+ for (long value : valuesArray[i]) {
+ sum += value;
+ }
+ groupByResultHolder.setValueForKey(groupKey, sum);
+ }
+ }
+ }
+
@Override
public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+ Preconditions.checkArgument(blockValSet.getValueType().getStoredType() ==
DataType.LONG
+ || blockValSet.getValueType().getStoredType() == DataType.INT,
+ "SumLongAggregationFunction only supports integer type columns");
+
+ if (blockValSet.isSingleValue()) {
+ aggregateSvGroupByMv(blockValSet, length, groupKeysArray,
groupByResultHolder);
+ } else {
+ aggregateMvGroupByMv(blockValSet, length, groupKeysArray,
groupByResultHolder);
+ }
+ }
+
+ private void aggregateSvGroupByMv(BlockValSet blockValSet, int length,
int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder) {
long[] values = blockValSet.getLongValuesSV();
if (_nullHandlingEnabled) {
@@ -163,6 +245,41 @@ public class SumLongAggregationFunction extends
NullableSingleInputAggregationFu
}
}
+ private void aggregateMvGroupByMv(BlockValSet blockValSet, int length,
int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder) {
+ long[][] valuesArray = blockValSet.getLongValuesMV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long[] values = valuesArray[i];
+ if (values.length > 0) {
+ // "i" has to be non-null here so we can use the default value as
the initial value instead of null
+ long sum = DEFAULT_VALUE;
+ for (long value : values) {
+ sum += value;
+ }
+ for (int groupKey : groupKeysArray[i]) {
+ Long result = groupByResultHolder.getResult(groupKey);
+ groupByResultHolder.setValueForKey(groupKey, result == null ?
sum : result + sum);
+ }
+ }
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ long[] values = valuesArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ long sum = groupByResultHolder.getLongResult(groupKey);
+ for (long value : values) {
+ sum += value;
+ }
+ groupByResultHolder.setValueForKey(groupKey, sum);
+ }
+ }
+ }
+ }
+
@Override
public Long extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
if (_nullHandlingEnabled) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
index fb0282e5c21..5bcbdc00ebd 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
@@ -193,4 +193,71 @@ public class MaxLongAggregationFunctionTest extends
AbstractAggregationFunctionT
).whenQuery("select maxlong(myField) from testTable")
.thenResultIs("LONG", "" + (Long.MAX_VALUE - 1));
}
+
+ @Test
+ public void aggregationMV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"1;2;3"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null"}
+ )
+ .whenQuery("select maxlong(mv) from testTable")
+ .thenResultIs("LONG", "3")
+ .whenQueryWithNullHandlingEnabled("select maxlong(mv) from testTable")
+ .thenResultIs("LONG", "3");
+ }
+
+ @Test
+ public void aggregationMVGroupBySV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("sv", FieldSpec.DataType.STRING)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"null", "k1"},
+ new Object[]{"1;2;3", "k2"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null", "k2"},
+ new Object[]{"1;2;3", "k1"}
+ )
+ .whenQuery("select maxlong(mv) from testTable group by sv")
+ .thenResultIs("LONG", "3", "3")
+ .whenQueryWithNullHandlingEnabled("select maxlong(mv) from testTable
group by sv")
+ .thenResultIs("LONG", "3", "3");
+ }
+
+ @Test
+ public void aggregationMVGroupByMV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv1", FieldSpec.DataType.LONG)
+ .addMultiValueDimension("mv2", FieldSpec.DataType.STRING)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"1;2", "k1;k2"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null", "k1;k2"}
+ )
+ .whenQuery("select maxlong(mv1) from testTable group by mv2")
+ .thenResultIs("LONG", "2", "2")
+ .whenQueryWithNullHandlingEnabled("select maxlong(mv1) from testTable
group by mv2")
+ .thenResultIs("LONG", "2", "2");
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
index cea3320314a..430c3e4e2a8 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
@@ -195,4 +195,78 @@ public class MinLongAggregationFunctionTest extends
AbstractAggregationFunctionT
).whenQuery("select minlong(myField) from testTable")
.thenResultIs("LONG", "" + (Long.MAX_VALUE - 5));
}
+
+ @Test
+ public void aggregationMV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"1;2;3"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null"}
+ )
+ .whenQuery("select minlong(mv) from testTable")
+ .thenResultIs("LONG",
+
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG, null)))
+ .whenQueryWithNullHandlingEnabled("select minlong(mv) from testTable")
+ .thenResultIs("LONG", "1");
+ }
+
+ @Test
+ public void aggregationMVGroupBySV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("sv", FieldSpec.DataType.STRING)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"null", "k1"},
+ new Object[]{"1;2;3", "k2"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null", "k2"},
+ new Object[]{"1;2;3", "k1"}
+ )
+ .whenQuery("select minlong(mv) from testTable group by sv")
+ .thenResultIs("LONG", String.valueOf(
+ ((Number)
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG,
+ null)).longValue()), String.valueOf(
+ ((Number)
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG,
+ null)).longValue()))
+ .whenQueryWithNullHandlingEnabled("select minlong(mv) from testTable
group by sv")
+ .thenResultIs("LONG", "1", "1");
+ }
+
+ @Test
+ public void aggregationMVGroupByMV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv1", FieldSpec.DataType.LONG)
+ .addMultiValueDimension("mv2", FieldSpec.DataType.STRING)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"1;2", "k1;k2"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null", "k1;k2"}
+ )
+ .whenQuery("select minlong(mv1) from testTable group by mv2")
+ .thenResultIs("LONG",
+
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG, null)),
+
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG, null)))
+ .whenQueryWithNullHandlingEnabled("select minlong(mv1) from testTable
group by mv2")
+ .thenResultIs("LONG", "1", "1");
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
index 5bc868b225c..dd6bf22d4e5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
@@ -176,4 +176,80 @@ public class SumLongAggregationFunctionTest extends
AbstractAggregationFunctionT
"tag3 | null"
);
}
+
+ @Test
+ public void aggregationMV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"1;2;3"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null"}
+ )
+ .whenQuery("select sumlong(mv) from testTable")
+ .thenResultIs("LONG", String.valueOf(
+ 1 + 2 + 3 + (long)
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG,
+ null)))
+ .whenQueryWithNullHandlingEnabled("select sumlong(mv) from testTable")
+ .thenResultIs("LONG",
+ String.valueOf(1 + 2 + 3));
+ }
+
+ @Test
+ public void aggregationMVGroupBySV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("sv", FieldSpec.DataType.STRING)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"null", "k1"},
+ new Object[]{"1;2;3", "k2"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null", "k2"},
+ new Object[]{"1;2;3", "k1"}
+ )
+ .whenQuery("select sumlong(mv) from testTable group by sv")
+ .thenResultIs("LONG", String.valueOf(
+ 6 + (long)
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG, null)),
+ String.valueOf(
+ 6 + (long)
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG, null)))
+ .whenQueryWithNullHandlingEnabled("select sumlong(mv) from testTable
group by sv")
+ .thenResultIs("LONG", "6", "6");
+ }
+
+ @Test
+ public void aggregationMVGroupByMV() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("mv1", FieldSpec.DataType.LONG)
+ .addMultiValueDimension("mv2", FieldSpec.DataType.STRING)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"1;2", "k1;k2"}
+ )
+ .andOnSecondInstance(
+ new Object[]{"null", "k1;k2"}
+ )
+ .whenQuery("select sumlong(mv1) from testTable group by mv2")
+ .thenResultIs("LONG", String.valueOf(
+ 3 + (long)
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG, null)),
+ String.valueOf(
+ 3 + (long)
FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
FieldSpec.DataType.LONG, null)))
+ .whenQueryWithNullHandlingEnabled("select sumlong(mv1) from testTable
group by mv2")
+ .thenResultIs("LONG", "3", "3");
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index c019e5c3aa5..d32ed0290ca 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3070,6 +3070,22 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
}
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testMvLongAggregations(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = "SELECT sumlong(DivTotalGTimes), minlong(DivTotalGTimes),
maxlong(DivTotalGTimes) FROM mytable";
+ JsonNode response = postQuery(query);
+ assertNoError(response);
+ JsonNode resultTable = response.get("resultTable");
+ JsonNode dataSchema = resultTable.get("dataSchema");
+ assertEquals(dataSchema.get("columnDataTypes").toString(),
"[\"LONG\",\"LONG\",\"LONG\"]");
+ JsonNode rows = resultTable.get("rows");
+ assertEquals(rows.size(), 1);
+ JsonNode row = rows.get(0);
+ assertEquals(row.size(), 3);
+ }
+
@AfterClass
public void tearDown()
throws Exception {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
index b40f913c4e8..8f479b19c7b 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
@@ -109,6 +109,11 @@ public class TypeSystem extends RelDataTypeSystemImpl {
@Override
public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType
argumentType) {
+ if (argumentType.getComponentType() != null) {
+ // For MV columns, the return type for SUM is the same as the return
type for SUM on the individual element type.
+ return deriveSumType(typeFactory, argumentType.getComponentType());
+ }
+
assert SqlTypeUtil.isNumeric(argumentType);
switch (argumentType.getSqlTypeName()) {
case TINYINT:
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 2d4bdd9a41f..7a0d8c23028 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -79,7 +79,9 @@ public class QueryEnvironmentTestBase {
TABLE_SCHEMAS.put("c_OFFLINE", getSchemaBuilder("c").build());
TABLE_SCHEMAS.put("d", getSchemaBuilder("d").build());
TABLE_SCHEMAS.put("e", getSchemaBuilder("e")
- .addMultiValueDimension("mcol1", FieldSpec.DataType.STRING).build());
+ .addMultiValueDimension("mcol1", FieldSpec.DataType.STRING)
+ .addMultiValueDimension("mcol2", FieldSpec.DataType.LONG)
+ .build());
}
static Schema.SchemaBuilder getSchemaBuilder(String schemaName) {
@@ -286,7 +288,9 @@ public class QueryEnvironmentTestBase {
// Verify type coercion in standard functions
new Object[]{"SELECT DATEADD('DAY', 1, col7) FROM a"},
new Object[]{"SELECT TIMESTAMPADD(DAY, 10, NOW() - 100) FROM a"},
- new Object[]{"SELECT ts FROM a WHERE ts <= '2025-08-14
00:00:00.000000'"}
+ new Object[]{"SELECT ts FROM a WHERE ts <= '2025-08-14
00:00:00.000000'"},
+ // Aggregations on MV LONG columns
+ new Object[]{"SELECT SUMLONG(mcol2), MINLONG(mcol2), MAXLONG(mcol2)
FROM e"}
};
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 614d8650749..93f6b2755fd 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -55,12 +55,12 @@ public enum AggregationFunctionType {
MAX("max", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
MINSTRING("minString", ReturnTypes.ARG0_NULLABLE_IF_EMPTY,
OperandTypes.CHARACTER),
MAXSTRING("maxString", ReturnTypes.ARG0_NULLABLE_IF_EMPTY,
OperandTypes.CHARACTER),
- MINLONG("minLong", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, OperandTypes.INTEGER),
- MAXLONG("maxLong", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, OperandTypes.INTEGER),
+ MINLONG("minLong", new BigintNullableIfEmpty(),
OperandTypes.or(OperandTypes.INTEGER, OperandTypes.ARRAY_OF_INTEGER)),
+ MAXLONG("maxLong", new BigintNullableIfEmpty(),
OperandTypes.or(OperandTypes.INTEGER, OperandTypes.ARRAY_OF_INTEGER)),
SUM("sum", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
SUM0("$sum0", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
SUMINT("sumInt", ReturnTypes.AGG_SUM, OperandTypes.INTEGER),
- SUMLONG("sumLong", ReturnTypes.AGG_SUM, OperandTypes.INTEGER),
+ SUMLONG("sumLong", ReturnTypes.AGG_SUM,
OperandTypes.or(OperandTypes.INTEGER, OperandTypes.ARRAY_OF_INTEGER)),
SUMPRECISION("sumPrecision", ReturnTypes.explicit(SqlTypeName.DECIMAL),
OperandTypes.ANY, SqlTypeName.OTHER),
AVG("avg", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
MODE("mode", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
@@ -400,4 +400,23 @@ public enum AggregationFunctionType {
return typeFactory.createArrayType(elementType, -1);
}
}
+
+ // Used for aggregation functions that always return BIGINT. The "IfEmpty"
logic ensures that the return type is
+ // nullable for pure aggregation queries (no group-by) and filtered
aggregation queries. Return values can be null
+ // if there are no matching rows (even if the operand type is not nullable).
+ private static class BigintNullableIfEmpty implements SqlReturnTypeInference
{
+ @Override
+ public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+ RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+ if (opBinding.getGroupCount() == 0 || opBinding.hasFilter()) {
+ return
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT),
true);
+ } else {
+ if (opBinding.getOperandType(0).isNullable()) {
+ return
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT),
true);
+ } else {
+ return typeFactory.createSqlType(SqlTypeName.BIGINT);
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]