This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a6a94c57225 Add LONG specific aggregation functions for MIN / MAX /
SUM (#17001)
a6a94c57225 is described below
commit a6a94c57225840af791dd5410820e96bf876b540
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Oct 10 17:01:53 2025 -0700
Add LONG specific aggregation functions for MIN / MAX / SUM (#17001)
---
.../query/NonScanBasedAggregationOperator.java | 39 +++-
.../pinot/core/plan/AggregationPlanNode.java | 6 +-
.../function/AggregationFunctionFactory.java | 6 +
.../function/MaxLongAggregationFunction.java | 206 +++++++++++++++++++
.../function/MinLongAggregationFunction.java | 206 +++++++++++++++++++
.../function/SumLongAggregationFunction.java | 219 +++++++++++++++++++++
.../function/MaxLongAggregationFunctionTest.java | 196 ++++++++++++++++++
.../function/MinLongAggregationFunctionTest.java | 198 +++++++++++++++++++
.../function/SumLongAggregationFunctionTest.java | 179 +++++++++++++++++
.../tests/OfflineClusterIntegrationTest.java | 6 +
.../pinot/segment/spi/AggregationFunctionType.java | 5 +-
11 files changed, 1258 insertions(+), 8 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index 8a0517a2b2a..8e7f6a41639 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.query;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.dynatrace.hash4j.distinctcount.UltraLogLog;
+import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -50,7 +51,7 @@ import
org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
@@ -101,6 +102,9 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
case MINMV:
result = getMinValueNumeric(dataSource);
break;
+ case MINLONG:
+ result = getMinValueLong(dataSource);
+ break;
case MINSTRING:
assert dataSource.getDictionary() != null;
result = dataSource.getDictionary().getMinVal();
@@ -109,6 +113,9 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
case MAXMV:
result = getMaxValueNumeric(dataSource);
break;
+ case MAXLONG:
+ result = getMaxValueLong(dataSource);
+ break;
case MAXSTRING:
assert dataSource.getDictionary() != null;
result = dataSource.getDictionary().getMaxVal();
@@ -188,6 +195,18 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
return toDouble(dataSource.getDataSourceMetadata().getMinValue());
}
+ private static Long getMinValueLong(DataSource dataSource) {
+ DataType dataType =
dataSource.getDataSourceMetadata().getDataType().getStoredType();
+ Preconditions.checkArgument(
+ dataType == DataType.LONG || dataType == DataType.INT,
+ "MINLONG aggregation function can only be applied to columns of
integer types");
+ Dictionary dictionary = dataSource.getDictionary();
+ if (dictionary != null) {
+ return ((Number) dictionary.getMinVal()).longValue();
+ }
+ return ((Number)
dataSource.getDataSourceMetadata().getMinValue()).longValue();
+ }
+
private static Double getMaxValueNumeric(DataSource dataSource) {
Dictionary dictionary = dataSource.getDictionary();
if (dictionary != null) {
@@ -196,6 +215,18 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
return toDouble(dataSource.getDataSourceMetadata().getMaxValue());
}
+ private static Long getMaxValueLong(DataSource dataSource) {
+ DataType dataType =
dataSource.getDataSourceMetadata().getDataType().getStoredType();
+ Preconditions.checkArgument(
+ dataType == DataType.LONG || dataType == DataType.INT,
+ "MAXLONG aggregation function can only be applied to columns of
integer types");
+ Dictionary dictionary = dataSource.getDictionary();
+ if (dictionary != null) {
+ return ((Number) dictionary.getMaxVal()).longValue();
+ }
+ return ((Number)
dataSource.getDataSourceMetadata().getMaxValue()).longValue();
+ }
+
private static Double toDouble(Comparable<?> value) {
if (value instanceof Double) {
return (Double) value;
@@ -280,7 +311,7 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
DistinctCountHLLAggregationFunction function) {
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+ if (dictionary.getValueType() == DataType.BYTES) {
// Treat BYTES value as serialized HyperLogLog
try {
HyperLogLog hll =
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(dictionary.getBytesValue(0));
@@ -299,7 +330,7 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
private static HyperLogLogPlus getDistinctCountHLLPlusResult(Dictionary
dictionary,
DistinctCountHLLPlusAggregationFunction function) {
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+ if (dictionary.getValueType() == DataType.BYTES) {
// Treat BYTES value as serialized HyperLogLogPlus
try {
HyperLogLogPlus hllplus =
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(0));
@@ -328,7 +359,7 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
private static UltraLogLog getDistinctCountULLResult(Dictionary dictionary,
DistinctCountULLAggregationFunction function) {
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+ if (dictionary.getValueType() == DataType.BYTES) {
// Treat BYTES value as serialized UltraLogLog and merge
try {
UltraLogLog ull =
ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(dictionary.getBytesValue(0));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 1aea069cc91..530cd3d9937 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -49,8 +49,8 @@ import static
org.apache.pinot.segment.spi.AggregationFunctionType.*;
@SuppressWarnings("rawtypes")
public class AggregationPlanNode implements PlanNode {
private static final EnumSet<AggregationFunctionType>
DICTIONARY_BASED_FUNCTIONS =
- EnumSet.of(MIN, MINMV, MINSTRING, MAX, MAXMV, MAXSTRING, MINMAXRANGE,
MINMAXRANGEMV, DISTINCTCOUNT,
- DISTINCTCOUNTMV, DISTINCTSUM, DISTINCTSUMMV, DISTINCTAVG,
DISTINCTAVGMV, DISTINCTCOUNTOFFHEAP,
+ EnumSet.of(MIN, MINMV, MINLONG, MINSTRING, MAX, MAXMV, MAXLONG,
MAXSTRING, MINMAXRANGE, MINMAXRANGEMV,
+ DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTSUM, DISTINCTSUMMV,
DISTINCTAVG, DISTINCTAVGMV, DISTINCTCOUNTOFFHEAP,
DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL,
DISTINCTCOUNTRAWHLLMV, DISTINCTCOUNTHLLPLUS,
DISTINCTCOUNTHLLPLUSMV, DISTINCTCOUNTRAWHLLPLUS,
DISTINCTCOUNTRAWHLLPLUSMV, DISTINCTCOUNTULL,
DISTINCTCOUNTRAWULL, SEGMENTPARTITIONEDDISTINCTCOUNT,
DISTINCTCOUNTSMARTHLL, DISTINCTCOUNTSMARTULL);
@@ -59,7 +59,7 @@ public class AggregationPlanNode implements PlanNode {
// MINSTRING / MAXSTRING excluded because of string column metadata issues
(see discussion in
// https://github.com/apache/pinot/pull/16983)
private static final EnumSet<AggregationFunctionType>
METADATA_BASED_FUNCTIONS =
- EnumSet.of(COUNT, MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV);
+ EnumSet.of(COUNT, MIN, MINMV, MINLONG, MAX, MAXMV, MAXLONG, MINMAXRANGE,
MINMAXRANGEMV);
private final IndexSegment _indexSegment;
private final SegmentContext _segmentContext;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index aebd26c46cb..d6824d75b24 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -220,11 +220,17 @@ public class AggregationFunctionFactory {
return new MinStringAggregationFunction(arguments,
nullHandlingEnabled);
case MAXSTRING:
return new MaxStringAggregationFunction(arguments,
nullHandlingEnabled);
+ case MINLONG:
+ return new MinLongAggregationFunction(arguments,
nullHandlingEnabled);
+ case MAXLONG:
+ return new MaxLongAggregationFunction(arguments,
nullHandlingEnabled);
case SUM:
case SUM0:
return new SumAggregationFunction(arguments, nullHandlingEnabled);
case SUMINT:
return new SumIntAggregationFunction(arguments,
nullHandlingEnabled);
+ case SUMLONG:
+ return new SumLongAggregationFunction(arguments,
nullHandlingEnabled);
case SUMPRECISION:
return new SumPrecisionAggregationFunction(arguments,
nullHandlingEnabled);
case AVG:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
new file mode 100644
index 00000000000..e01b0eeb22e
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.LongAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.LongGroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class MaxLongAggregationFunction extends
NullableSingleInputAggregationFunction<Long, Long> {
+ protected static final long DEFAULT_INITIAL_VALUE = Long.MIN_VALUE;
+
+ public MaxLongAggregationFunction(List<ExpressionContext> arguments, boolean
nullHandlingEnabled) {
+ this(verifySingleArgument(arguments, "MAXLONG"), nullHandlingEnabled);
+ }
+
+ protected MaxLongAggregationFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.MAXLONG;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ if (_nullHandlingEnabled) {
+ return new ObjectAggregationResultHolder();
+ }
+ return new LongAggregateResultHolder(DEFAULT_INITIAL_VALUE);
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ if (_nullHandlingEnabled) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+ return new LongGroupByResultHolder(initialCapacity, maxCapacity,
DEFAULT_INITIAL_VALUE);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ long[] values = blockValSet.getLongValuesSV();
+
+ Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+ long innerMax = values[from];
+ for (int i = from; i < to; i++) {
+ innerMax = Math.max(innerMax, values[i]);
+ }
+ return acum == null ? innerMax : Math.max(acum, innerMax);
+ });
+
+ updateAggregationResultHolder(aggregationResultHolder, max);
+ }
+
+ protected void updateAggregationResultHolder(AggregationResultHolder
aggregationResultHolder, Long max) {
+ if (max != null) {
+ if (_nullHandlingEnabled) {
+ Long otherMax = aggregationResultHolder.getResult();
+ aggregationResultHolder.setValue(otherMax == null ? max :
Math.max(max, otherMax));
+ } else {
+ long otherMax = aggregationResultHolder.getLongResult();
+ aggregationResultHolder.setValue(Math.max(max, otherMax));
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ long[] valueArray = blockValSet.getLongValuesSV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long value = valueArray[i];
+ int groupKey = groupKeyArray[i];
+ Long result = groupByResultHolder.getResult(groupKey);
+ if (result == null || value > result) {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ long value = valueArray[i];
+ int groupKey = groupKeyArray[i];
+ if (value > groupByResultHolder.getLongResult(groupKey)) {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ long[] valueArray = blockValSet.getLongValuesSV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long value = valueArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ Long result = groupByResultHolder.getResult(groupKey);
+ if (result == null || value > result) {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ long value = valueArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ if (value > groupByResultHolder.getLongResult(groupKey)) {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public Long extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ if (_nullHandlingEnabled) {
+ return aggregationResultHolder.getResult();
+ }
+ return aggregationResultHolder.getLongResult();
+ }
+
+ @Override
+ public Long extractGroupByResult(GroupByResultHolder groupByResultHolder,
int groupKey) {
+ if (_nullHandlingEnabled) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+ return groupByResultHolder.getLongResult(groupKey);
+ }
+
+ @Override
+ public Long merge(Long intermediateMaxResult1, Long intermediateMaxResult2) {
+ if (_nullHandlingEnabled) {
+ if (intermediateMaxResult1 == null) {
+ return intermediateMaxResult2;
+ }
+ if (intermediateMaxResult2 == null) {
+ return intermediateMaxResult1;
+ }
+ }
+
+ if (intermediateMaxResult1 > intermediateMaxResult2) {
+ return intermediateMaxResult1;
+ }
+ return intermediateMaxResult2;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Long extractFinalResult(Long intermediateResult) {
+ return intermediateResult;
+ }
+
+ @Override
+ public Long mergeFinalResult(Long finalResult1, Long finalResult2) {
+ return merge(finalResult1, finalResult2);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
new file mode 100644
index 00000000000..c48d222abf8
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.LongAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.LongGroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class MinLongAggregationFunction extends
NullableSingleInputAggregationFunction<Long, Long> {
+ protected static final Long DEFAULT_VALUE = Long.MAX_VALUE;
+
+ public MinLongAggregationFunction(List<ExpressionContext> arguments, boolean
nullHandlingEnabled) {
+ this(verifySingleArgument(arguments, "MINLONG"), nullHandlingEnabled);
+ }
+
+ protected MinLongAggregationFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.MINLONG;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ if (_nullHandlingEnabled) {
+ return new ObjectAggregationResultHolder();
+ }
+ return new LongAggregateResultHolder(DEFAULT_VALUE);
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ if (_nullHandlingEnabled) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+ return new LongGroupByResultHolder(initialCapacity, maxCapacity,
DEFAULT_VALUE);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ long[] values = blockValSet.getLongValuesSV();
+
+ Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+ long innerMin = values[from];
+ for (int i = from; i < to; i++) {
+ innerMin = Math.min(innerMin, values[i]);
+ }
+ return acum == null ? innerMin : Math.min(acum, innerMin);
+ });
+
+ updateAggregationResultHolder(aggregationResultHolder, min);
+ }
+
+ protected void updateAggregationResultHolder(AggregationResultHolder
aggregationResultHolder, Long min) {
+ if (min != null) {
+ if (_nullHandlingEnabled) {
+ Long otherMin = aggregationResultHolder.getResult();
+ aggregationResultHolder.setValue(otherMin == null ? min :
Math.min(min, otherMin));
+ } else {
+ long otherMin = aggregationResultHolder.getLongResult();
+ aggregationResultHolder.setValue(Math.min(min, otherMin));
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ long[] valueArray = blockValSet.getLongValuesSV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long value = valueArray[i];
+ int groupKey = groupKeyArray[i];
+ Long result = groupByResultHolder.getResult(groupKey);
+ if (result == null || value < result) {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ long value = valueArray[i];
+ int groupKey = groupKeyArray[i];
+ if (value < groupByResultHolder.getLongResult(groupKey)) {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ long[] valueArray = blockValSet.getLongValuesSV();
+
+ if (_nullHandlingEnabled) {
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long value = valueArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ Long result = groupByResultHolder.getResult(groupKey);
+ if (result == null || value < result) {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ }
+ });
+ } else {
+ for (int i = 0; i < length; i++) {
+ long value = valueArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ if (value < groupByResultHolder.getLongResult(groupKey)) {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public Long extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ if (_nullHandlingEnabled) {
+ return aggregationResultHolder.getResult();
+ }
+ return aggregationResultHolder.getLongResult();
+ }
+
+ @Override
+ public Long extractGroupByResult(GroupByResultHolder groupByResultHolder,
int groupKey) {
+ if (_nullHandlingEnabled) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+ return groupByResultHolder.getLongResult(groupKey);
+ }
+
+ @Override
+ public Long merge(Long intermediateMinResult1, Long intermediateMinResult2) {
+ if (_nullHandlingEnabled) {
+ if (intermediateMinResult1 == null) {
+ return intermediateMinResult2;
+ }
+ if (intermediateMinResult2 == null) {
+ return intermediateMinResult1;
+ }
+ }
+
+ if (intermediateMinResult1 < intermediateMinResult2) {
+ return intermediateMinResult1;
+ }
+ return intermediateMinResult2;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.LONG;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.LONG;
+ }
+
+ @Override
+ public Long extractFinalResult(Long intermediateResult) {
+ return intermediateResult;
+ }
+
+ @Override
+ public Long mergeFinalResult(Long finalResult1, Long finalResult2) {
+ return merge(finalResult1, finalResult2);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
new file mode 100644
index 00000000000..5f142acad0a
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.LongAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.LongGroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Specialized LONG sum aggregation function that avoids type conversion
overhead.
+ * This function is optimized for LONG columns and uses native LONG arithmetic.
+ *
+ * Performance optimizations:
+ * - Direct LONG arithmetic without DOUBLE conversion
+ * - Vectorized operations for better CPU utilization
+ * - Minimal object allocations
+ * - Optimized for the specific case of LONG column aggregation
+ * - Proper null handling support using foldNotNull and forEachNotNull
+ */
+public class SumLongAggregationFunction extends
NullableSingleInputAggregationFunction<Long, Long> {
+ public static final String FUNCTION_NAME = "SUMLONG";
+ private static final long DEFAULT_VALUE = 0L;
+
+ public SumLongAggregationFunction(List<ExpressionContext> arguments, boolean
nullHandlingEnabled) {
+ super(arguments.get(0), nullHandlingEnabled);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.SUMLONG;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ if (_nullHandlingEnabled) {
+ return new ObjectAggregationResultHolder();
+ } else {
+ return new LongAggregateResultHolder(DEFAULT_VALUE);
+ }
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ if (_nullHandlingEnabled) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ } else {
+ return new LongGroupByResultHolder(initialCapacity, maxCapacity,
DEFAULT_VALUE);
+ }
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.getValueType().getStoredType() != DataType.LONG) {
+ throw new IllegalArgumentException("SumLongAggregationFunction only
supports LONG columns");
+ }
+
+ long[] values = blockValSet.getLongValuesSV();
+
+ // Use foldNotNull with null as initial value - this will return null if
no non-null values are processed
+ Long sum = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+ long innerSum = 0;
+ for (int i = from; i < to; i++) {
+ innerSum += values[i];
+ }
+ return acum == null ? innerSum : acum + innerSum;
+ });
+
+ updateAggregationResultHolder(aggregationResultHolder, sum);
+ }
+
+ private void updateAggregationResultHolder(AggregationResultHolder
aggregationResultHolder, Long sum) {
+ if (sum != null) {
+ if (_nullHandlingEnabled) {
+ Long otherSum = aggregationResultHolder.getResult();
+ aggregationResultHolder.setValue(otherSum == null ? sum : otherSum +
sum);
+ } else {
+ long otherSum = aggregationResultHolder.getLongResult();
+ aggregationResultHolder.setValue(otherSum + sum);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ long[] values = blockValSet.getLongValuesSV();
+
+ if (_nullHandlingEnabled) {
+ // Use forEachNotNull to handle nulls properly
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ int groupKey = groupKeyArray[i];
+ Long existingResult = groupByResultHolder.getResult(groupKey);
+ groupByResultHolder.setValueForKey(groupKey,
+ (existingResult == null ? values[i] : values[i] +
existingResult));
+ }
+ });
+ } else {
+ // Process all values when null handling is disabled
+ for (int i = 0; i < length; i++) {
+ int groupKey = groupKeyArray[i];
+ groupByResultHolder.setValueForKey(groupKey,
groupByResultHolder.getLongResult(groupKey) + values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ long[] values = blockValSet.getLongValuesSV();
+
+ if (_nullHandlingEnabled) {
+ // Use forEachNotNull to handle nulls properly
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long value = values[i];
+ for (int groupKey : groupKeysArray[i]) {
+ Long existingResult = groupByResultHolder.getResult(groupKey);
+ groupByResultHolder.setValueForKey(groupKey, existingResult ==
null ? value : existingResult + value);
+ }
+ }
+ });
+ } else {
+ // Process all values when null handling is disabled
+ for (int i = 0; i < length; i++) {
+ long value = values[i];
+ for (int groupKey : groupKeysArray[i]) {
+ groupByResultHolder.setValueForKey(groupKey,
groupByResultHolder.getLongResult(groupKey) + value);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Long extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ if (_nullHandlingEnabled) {
+ return aggregationResultHolder.getResult();
+ }
+ return aggregationResultHolder.getLongResult();
+ }
+
+ @Override
+ public Long extractGroupByResult(GroupByResultHolder groupByResultHolder,
int groupKey) {
+ if (_nullHandlingEnabled) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+ return groupByResultHolder.getLongResult(groupKey);
+ }
+
+ @Override
+ public Long merge(Long intermediateResult1, Long intermediateResult2) {
+ if (_nullHandlingEnabled) {
+ if (intermediateResult1 == null) {
+ return intermediateResult2;
+ }
+ if (intermediateResult2 == null) {
+ return intermediateResult1;
+ }
+ // Both are non-null
+ return intermediateResult1 + intermediateResult2;
+ } else {
+ long val1 = (intermediateResult1 != null) ? intermediateResult1 : 0L;
+ long val2 = (intermediateResult2 != null) ? intermediateResult2 : 0L;
+ return val1 + val2;
+ }
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.LONG;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.LONG;
+ }
+
+ @Override
+ public Long extractFinalResult(Long intermediateResult) {
+ return intermediateResult;
+ }
+
+ @Override
+ public Long mergeFinalResult(Long finalResult1, Long finalResult2) {
+ return merge(finalResult1, finalResult2);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
new file mode 100644
index 00000000000..fb0282e5c21
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class MaxLongAggregationFunctionTest extends
AbstractAggregationFunctionTest {
+
+ @DataProvider(name = "scenarios")
+ Object[] scenarios() {
+ return new Object[] {
+ new DataTypeScenario(FieldSpec.DataType.INT),
+ new DataTypeScenario(FieldSpec.DataType.LONG)
+ };
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationAllNullsWithNullHandlingDisabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select maxlong(myField) from testTable")
+ .thenResultIs("LONG",
+
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
scenario.getDataType(), null)));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationAllNullsWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select maxlong(myField) from testTable")
+ .thenResultIs("LONG", "null");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVAllNullsWithNullHandlingDisabled(DataTypeScenario
scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select 'literal', maxlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | "
+ + FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
scenario.getDataType(), null));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVAllNullsWithNullHandlingEnabled(DataTypeScenario
scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select 'literal', maxlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | null");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationWithNullHandlingDisabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "2",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "5",
+ "null"
+ ).whenQuery("select maxlong(myField) from testTable")
+ .thenResultIs("LONG", "5");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "2",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "5",
+ "null"
+ ).whenQuery("select maxlong(myField) from testTable")
+ .thenResultIs("LONG", "5");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVWithNullHandlingDisabled(DataTypeScenario scenario)
{
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "2",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "5",
+ "null"
+ ).whenQuery("select 'literal', maxlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | 5");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "3",
+ "null",
+ "5"
+ ).andOnSecondInstance("myField",
+ "null",
+ "null",
+ "null"
+ ).whenQuery("select 'literal', maxlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | 5");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupByMV(DataTypeScenario scenario) {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("tags", FieldSpec.DataType.STRING)
+ .addMetricField("value", scenario.getDataType())
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"tag1;tag2", -1},
+ new Object[]{"tag2;tag3", null}
+ )
+ .andOnSecondInstance(
+ new Object[]{"tag1;tag2", -2},
+ new Object[]{"tag2;tag3", null}
+ )
+ .whenQuery("select tags, maxlong(value) from testTable group by tags
order by tags")
+ .thenResultIs(
+ "STRING | LONG",
+ "tag1 | -1",
+ "tag2 | 0",
+ "tag3 | 0"
+ )
+ .whenQueryWithNullHandlingEnabled("select tags, maxlong(value) from
testTable group by tags order by tags")
+ .thenResultIs(
+ "STRING | LONG",
+ "tag1 | -1",
+ "tag2 | -1",
+ "tag3 | null"
+ );
+ }
+
+ @Test
+ public void aggregationOnLargeLongValues() {
+ new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "" + (Long.MAX_VALUE - 6),
+ "" + (Long.MAX_VALUE - 5),
+ "" + (Long.MAX_VALUE - 4)
+ ).andOnSecondInstance("myField",
+ "" + (Long.MAX_VALUE - 3),
+ "" + (Long.MAX_VALUE - 2),
+ "" + (Long.MAX_VALUE - 1)
+ ).whenQuery("select maxlong(myField) from testTable")
+ .thenResultIs("LONG", "" + (Long.MAX_VALUE - 1));
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
new file mode 100644
index 00000000000..cea3320314a
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class MinLongAggregationFunctionTest extends
AbstractAggregationFunctionTest {
+
+ @DataProvider(name = "scenarios")
+ Object[] scenarios() {
+ return new Object[] {
+ new DataTypeScenario(FieldSpec.DataType.INT),
+ new DataTypeScenario(FieldSpec.DataType.LONG)
+ };
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationAllNullsWithNullHandlingDisabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select minlong(myField) from testTable")
+ .thenResultIs("LONG",
+
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
scenario.getDataType(), null)));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationAllNullsWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select minlong(myField) from testTable")
+ .thenResultIs("LONG", "null");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVAllNullsWithNullHandlingDisabled(DataTypeScenario
scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select 'literal', minlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | "
+ + FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
scenario.getDataType(), null));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVAllNullsWithNullHandlingEnabled(DataTypeScenario
scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select 'literal', minlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | null");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationWithNullHandlingDisabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "5",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "2",
+ "null"
+ ).whenQuery("select minlong(myField) from testTable")
+ .thenResultIs("LONG",
+
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
scenario.getDataType(), null)));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "5",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "2",
+ "null"
+ ).whenQuery("select minlong(myField) from testTable")
+ .thenResultIs("LONG", "2");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVWithNullHandlingDisabled(DataTypeScenario scenario)
{
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "5",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "2",
+ "null"
+ ).whenQuery("select 'literal', minlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | "
+ + FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION,
scenario.getDataType(), null));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "5",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "null",
+ "null"
+ ).whenQuery("select 'literal', minlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | 3");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupByMV(DataTypeScenario scenario) {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("tags", FieldSpec.DataType.STRING)
+ .addMetricField("value", scenario.getDataType())
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"tag1;tag2", 1},
+ new Object[]{"tag2;tag3", null}
+ )
+ .andOnSecondInstance(
+ new Object[]{"tag1;tag2", 2},
+ new Object[]{"tag2;tag3", null}
+ )
+ .whenQuery("select tags, minlong(value) from testTable group by tags
order by tags")
+ .thenResultIs(
+ "STRING | LONG",
+ "tag1 | 1",
+ "tag2 | 0",
+ "tag3 | 0"
+ )
+ .whenQueryWithNullHandlingEnabled("select tags, minlong(value) from
testTable group by tags order by tags")
+ .thenResultIs(
+ "STRING | LONG",
+ "tag1 | 1",
+ "tag2 | 1",
+ "tag3 | null"
+ );
+ }
+
+ @Test
+ public void aggregationOnLargeLongValues() {
+ new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "" + Long.MAX_VALUE,
+ "" + (Long.MAX_VALUE - 1),
+ "" + (Long.MAX_VALUE - 2)
+ ).andOnSecondInstance("myField",
+ "" + (Long.MAX_VALUE - 3),
+ "" + (Long.MAX_VALUE - 4),
+ "" + (Long.MAX_VALUE - 5)
+ ).whenQuery("select minlong(myField) from testTable")
+ .thenResultIs("LONG", "" + (Long.MAX_VALUE - 5));
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
new file mode 100644
index 00000000000..5bc868b225c
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class SumLongAggregationFunctionTest extends
AbstractAggregationFunctionTest {
+
+ @DataProvider(name = "scenarios")
+ Object[] scenarios() {
+ return new Object[]{
+ new DataTypeScenario(FieldSpec.DataType.LONG)
+ };
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationAllNullsWithNullHandlingDisabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(false, FieldSpec.FieldType.METRIC)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select sumlong(myField) from testTable")
+ .thenResultIs("LONG",
+
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.METRIC,
scenario.getDataType(), null)));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationAllNullsWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true, FieldSpec.FieldType.METRIC)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select sumlong(myField) from testTable")
+ .thenResultIs("LONG", "null");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVAllNullsWithNullHandlingDisabled(DataTypeScenario
scenario) {
+ scenario.getDeclaringTable(false, FieldSpec.FieldType.METRIC)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select 'literal', sumlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | "
+ + FieldSpec.getDefaultNullValue(FieldSpec.FieldType.METRIC,
scenario.getDataType(), null));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVAllNullsWithNullHandlingEnabled(DataTypeScenario
scenario) {
+ scenario.getDeclaringTable(true, FieldSpec.FieldType.METRIC)
+ .onFirstInstance("myField",
+ "null",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null"
+ ).whenQuery("select 'literal', sumlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | null");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationWithNullHandlingDisabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(false, FieldSpec.FieldType.METRIC)
+ .onFirstInstance("myField",
+ "3",
+ "null",
+ "5"
+ ).andOnSecondInstance("myField",
+ "null",
+ "null"
+ ).whenQuery("select sumlong(myField) from testTable")
+ .thenResultIs("LONG", "8");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true, FieldSpec.FieldType.METRIC)
+ .onFirstInstance("myField",
+ "null",
+ "5",
+ "null"
+ ).andOnSecondInstance("myField",
+ "2",
+ "null",
+ "3"
+ ).whenQuery("select sumlong(myField) from testTable")
+ .thenResultIs("LONG", "10");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVWithNullHandlingDisabled(DataTypeScenario scenario)
{
+ scenario.getDeclaringTable(false, FieldSpec.FieldType.METRIC)
+ .onFirstInstance("myField",
+ "5",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "2",
+ "null"
+ ).whenQuery("select 'literal', sumlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | 10");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVWithNullHandlingEnabled(DataTypeScenario scenario) {
+ scenario.getDeclaringTable(true, FieldSpec.FieldType.METRIC)
+ .onFirstInstance("myField",
+ "5",
+ "null",
+ "3"
+ ).andOnSecondInstance("myField",
+ "null",
+ "null",
+ "null"
+ ).whenQuery("select 'literal', sumlong(myField) from testTable group
by 'literal'")
+ .thenResultIs("STRING | LONG", "literal | 8");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupByMV(DataTypeScenario scenario) {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .givenTable(
+ new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addMultiValueDimension("tags", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("value", scenario.getDataType(), -1)
+ .build(), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[]{"tag1;tag2", 1},
+ new Object[]{"tag2;tag3", null}
+ )
+ .andOnSecondInstance(
+ new Object[]{"tag1;tag2", 2},
+ new Object[]{"tag2;tag3", null}
+ )
+ .whenQuery("select tags, sumlong(value) from testTable group by tags
order by tags")
+ .thenResultIs(
+ "STRING | LONG",
+ "tag1 | 3",
+ "tag2 | 1",
+ "tag3 | -2"
+ )
+ .whenQueryWithNullHandlingEnabled("select tags, sumlong(value) from
testTable group by tags order by tags")
+ .thenResultIs(
+ "STRING | LONG",
+ "tag1 | 3",
+ "tag2 | 3",
+ "tag3 | null"
+ );
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 0018a5ea10b..c019e5c3aa5 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3594,6 +3594,12 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
testNonScanAggregationQuery(query);
query = "SELECT MAX(AirlineID) FROM " + tableName;
testNonScanAggregationQuery(query);
+ query = "SELECT MINLONG(AirlineID) FROM " + tableName;
+ h2Query = "SELECT MIN(AirlineID) FROM " + tableName;
+ testNonScanAggregationQuery(query, h2Query);
+ query = "SELECT MAXLONG(AirlineID) FROM " + tableName;
+ h2Query = "SELECT MAX(AirlineID) FROM " + tableName;
+ testNonScanAggregationQuery(query, h2Query);
query = "SELECT MIN_MAX_RANGE(AirlineID) FROM " + tableName;
h2Query = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 7cddf5008c5..614d8650749 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -55,9 +55,12 @@ public enum AggregationFunctionType {
MAX("max", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
MINSTRING("minString", ReturnTypes.ARG0_NULLABLE_IF_EMPTY,
OperandTypes.CHARACTER),
MAXSTRING("maxString", ReturnTypes.ARG0_NULLABLE_IF_EMPTY,
OperandTypes.CHARACTER),
+ MINLONG("minLong", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, OperandTypes.INTEGER),
+ MAXLONG("maxLong", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, OperandTypes.INTEGER),
SUM("sum", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
SUM0("$sum0", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
- SUMINT("sumInt", ReturnTypes.BIGINT, OperandTypes.INTEGER),
+ SUMINT("sumInt", ReturnTypes.AGG_SUM, OperandTypes.INTEGER),
+ SUMLONG("sumLong", ReturnTypes.AGG_SUM, OperandTypes.INTEGER),
SUMPRECISION("sumPrecision", ReturnTypes.explicit(SqlTypeName.DECIMAL),
OperandTypes.ANY, SqlTypeName.OTHER),
AVG("avg", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
MODE("mode", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]