This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a6a94c57225 Add LONG specific aggregation functions for MIN / MAX / 
SUM (#17001)
a6a94c57225 is described below

commit a6a94c57225840af791dd5410820e96bf876b540
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Oct 10 17:01:53 2025 -0700

    Add LONG specific aggregation functions for MIN / MAX / SUM (#17001)
---
 .../query/NonScanBasedAggregationOperator.java     |  39 +++-
 .../pinot/core/plan/AggregationPlanNode.java       |   6 +-
 .../function/AggregationFunctionFactory.java       |   6 +
 .../function/MaxLongAggregationFunction.java       | 206 +++++++++++++++++++
 .../function/MinLongAggregationFunction.java       | 206 +++++++++++++++++++
 .../function/SumLongAggregationFunction.java       | 219 +++++++++++++++++++++
 .../function/MaxLongAggregationFunctionTest.java   | 196 ++++++++++++++++++
 .../function/MinLongAggregationFunctionTest.java   | 198 +++++++++++++++++++
 .../function/SumLongAggregationFunctionTest.java   | 179 +++++++++++++++++
 .../tests/OfflineClusterIntegrationTest.java       |   6 +
 .../pinot/segment/spi/AggregationFunctionType.java |   5 +-
 11 files changed, 1258 insertions(+), 8 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index 8a0517a2b2a..8e7f6a41639 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.query;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.dynatrace.hash4j.distinctcount.UltraLogLog;
+import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
 import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -50,7 +51,7 @@ import 
org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
 
 
@@ -101,6 +102,9 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
         case MINMV:
           result = getMinValueNumeric(dataSource);
           break;
+        case MINLONG:
+          result = getMinValueLong(dataSource);
+          break;
         case MINSTRING:
           assert dataSource.getDictionary() != null;
           result = dataSource.getDictionary().getMinVal();
@@ -109,6 +113,9 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
         case MAXMV:
           result = getMaxValueNumeric(dataSource);
           break;
+        case MAXLONG:
+          result = getMaxValueLong(dataSource);
+          break;
         case MAXSTRING:
           assert dataSource.getDictionary() != null;
           result = dataSource.getDictionary().getMaxVal();
@@ -188,6 +195,18 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
     return toDouble(dataSource.getDataSourceMetadata().getMinValue());
   }
 
+  private static Long getMinValueLong(DataSource dataSource) {
+    DataType dataType = 
dataSource.getDataSourceMetadata().getDataType().getStoredType();
+    Preconditions.checkArgument(
+        dataType == DataType.LONG || dataType == DataType.INT,
+        "MINLONG aggregation function can only be applied to columns of 
integer types");
+    Dictionary dictionary = dataSource.getDictionary();
+    if (dictionary != null) {
+      return ((Number) dictionary.getMinVal()).longValue();
+    }
+    return ((Number) 
dataSource.getDataSourceMetadata().getMinValue()).longValue();
+  }
+
   private static Double getMaxValueNumeric(DataSource dataSource) {
     Dictionary dictionary = dataSource.getDictionary();
     if (dictionary != null) {
@@ -196,6 +215,18 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
     return toDouble(dataSource.getDataSourceMetadata().getMaxValue());
   }
 
+  private static Long getMaxValueLong(DataSource dataSource) {
+    DataType dataType = 
dataSource.getDataSourceMetadata().getDataType().getStoredType();
+    Preconditions.checkArgument(
+        dataType == DataType.LONG || dataType == DataType.INT,
+        "MAXLONG aggregation function can only be applied to columns of 
integer types");
+    Dictionary dictionary = dataSource.getDictionary();
+    if (dictionary != null) {
+      return ((Number) dictionary.getMaxVal()).longValue();
+    }
+    return ((Number) 
dataSource.getDataSourceMetadata().getMaxValue()).longValue();
+  }
+
   private static Double toDouble(Comparable<?> value) {
     if (value instanceof Double) {
       return (Double) value;
@@ -280,7 +311,7 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
 
   private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
       DistinctCountHLLAggregationFunction function) {
-    if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+    if (dictionary.getValueType() == DataType.BYTES) {
       // Treat BYTES value as serialized HyperLogLog
       try {
         HyperLogLog hll = 
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(dictionary.getBytesValue(0));
@@ -299,7 +330,7 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
 
   private static HyperLogLogPlus getDistinctCountHLLPlusResult(Dictionary 
dictionary,
       DistinctCountHLLPlusAggregationFunction function) {
-    if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+    if (dictionary.getValueType() == DataType.BYTES) {
       // Treat BYTES value as serialized HyperLogLogPlus
       try {
         HyperLogLogPlus hllplus = 
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(0));
@@ -328,7 +359,7 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
 
   private static UltraLogLog getDistinctCountULLResult(Dictionary dictionary,
       DistinctCountULLAggregationFunction function) {
-    if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+    if (dictionary.getValueType() == DataType.BYTES) {
       // Treat BYTES value as serialized UltraLogLog and merge
       try {
         UltraLogLog ull = 
ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(dictionary.getBytesValue(0));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 1aea069cc91..530cd3d9937 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -49,8 +49,8 @@ import static 
org.apache.pinot.segment.spi.AggregationFunctionType.*;
 @SuppressWarnings("rawtypes")
 public class AggregationPlanNode implements PlanNode {
   private static final EnumSet<AggregationFunctionType> 
DICTIONARY_BASED_FUNCTIONS =
-      EnumSet.of(MIN, MINMV, MINSTRING, MAX, MAXMV, MAXSTRING, MINMAXRANGE, 
MINMAXRANGEMV, DISTINCTCOUNT,
-          DISTINCTCOUNTMV, DISTINCTSUM, DISTINCTSUMMV, DISTINCTAVG, 
DISTINCTAVGMV, DISTINCTCOUNTOFFHEAP,
+      EnumSet.of(MIN, MINMV, MINLONG, MINSTRING, MAX, MAXMV, MAXLONG, 
MAXSTRING, MINMAXRANGE, MINMAXRANGEMV,
+          DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTSUM, DISTINCTSUMMV, 
DISTINCTAVG, DISTINCTAVGMV, DISTINCTCOUNTOFFHEAP,
           DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTRAWHLLMV, DISTINCTCOUNTHLLPLUS,
           DISTINCTCOUNTHLLPLUSMV, DISTINCTCOUNTRAWHLLPLUS, 
DISTINCTCOUNTRAWHLLPLUSMV, DISTINCTCOUNTULL,
           DISTINCTCOUNTRAWULL, SEGMENTPARTITIONEDDISTINCTCOUNT, 
DISTINCTCOUNTSMARTHLL, DISTINCTCOUNTSMARTULL);
@@ -59,7 +59,7 @@ public class AggregationPlanNode implements PlanNode {
   // MINSTRING / MAXSTRING excluded because of string column metadata issues 
(see discussion in
   // https://github.com/apache/pinot/pull/16983)
   private static final EnumSet<AggregationFunctionType> 
METADATA_BASED_FUNCTIONS =
-      EnumSet.of(COUNT, MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV);
+      EnumSet.of(COUNT, MIN, MINMV, MINLONG, MAX, MAXMV, MAXLONG, MINMAXRANGE, 
MINMAXRANGEMV);
 
   private final IndexSegment _indexSegment;
   private final SegmentContext _segmentContext;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index aebd26c46cb..d6824d75b24 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -220,11 +220,17 @@ public class AggregationFunctionFactory {
             return new MinStringAggregationFunction(arguments, 
nullHandlingEnabled);
           case MAXSTRING:
             return new MaxStringAggregationFunction(arguments, 
nullHandlingEnabled);
+          case MINLONG:
+            return new MinLongAggregationFunction(arguments, 
nullHandlingEnabled);
+          case MAXLONG:
+            return new MaxLongAggregationFunction(arguments, 
nullHandlingEnabled);
           case SUM:
           case SUM0:
             return new SumAggregationFunction(arguments, nullHandlingEnabled);
           case SUMINT:
             return new SumIntAggregationFunction(arguments, 
nullHandlingEnabled);
+          case SUMLONG:
+            return new SumLongAggregationFunction(arguments, 
nullHandlingEnabled);
           case SUMPRECISION:
             return new SumPrecisionAggregationFunction(arguments, 
nullHandlingEnabled);
           case AVG:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
new file mode 100644
index 00000000000..e01b0eeb22e
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunction.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.LongAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.LongGroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class MaxLongAggregationFunction extends 
NullableSingleInputAggregationFunction<Long, Long> {
+  protected static final long DEFAULT_INITIAL_VALUE = Long.MIN_VALUE;
+
+  public MaxLongAggregationFunction(List<ExpressionContext> arguments, boolean 
nullHandlingEnabled) {
+    this(verifySingleArgument(arguments, "MAXLONG"), nullHandlingEnabled);
+  }
+
+  protected MaxLongAggregationFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.MAXLONG;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    if (_nullHandlingEnabled) {
+      return new ObjectAggregationResultHolder();
+    }
+    return new LongAggregateResultHolder(DEFAULT_INITIAL_VALUE);
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    if (_nullHandlingEnabled) {
+      return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+    }
+    return new LongGroupByResultHolder(initialCapacity, maxCapacity, 
DEFAULT_INITIAL_VALUE);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    long[] values = blockValSet.getLongValuesSV();
+
+    Long max = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+      long innerMax = values[from];
+      for (int i = from; i < to; i++) {
+        innerMax = Math.max(innerMax, values[i]);
+      }
+      return acum == null ? innerMax : Math.max(acum, innerMax);
+    });
+
+    updateAggregationResultHolder(aggregationResultHolder, max);
+  }
+
+  protected void updateAggregationResultHolder(AggregationResultHolder 
aggregationResultHolder, Long max) {
+    if (max != null) {
+      if (_nullHandlingEnabled) {
+        Long otherMax = aggregationResultHolder.getResult();
+        aggregationResultHolder.setValue(otherMax == null ? max : 
Math.max(max, otherMax));
+      } else {
+        long otherMax = aggregationResultHolder.getLongResult();
+        aggregationResultHolder.setValue(Math.max(max, otherMax));
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    long[] valueArray = blockValSet.getLongValuesSV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          long value = valueArray[i];
+          int groupKey = groupKeyArray[i];
+          Long result = groupByResultHolder.getResult(groupKey);
+          if (result == null || value > result) {
+            groupByResultHolder.setValueForKey(groupKey, value);
+          }
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        long value = valueArray[i];
+        int groupKey = groupKeyArray[i];
+        if (value > groupByResultHolder.getLongResult(groupKey)) {
+          groupByResultHolder.setValueForKey(groupKey, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    long[] valueArray = blockValSet.getLongValuesSV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          long value = valueArray[i];
+          for (int groupKey : groupKeysArray[i]) {
+            Long result = groupByResultHolder.getResult(groupKey);
+            if (result == null || value > result) {
+              groupByResultHolder.setValueForKey(groupKey, value);
+            }
+          }
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        long value = valueArray[i];
+        for (int groupKey : groupKeysArray[i]) {
+          if (value > groupByResultHolder.getLongResult(groupKey)) {
+            groupByResultHolder.setValueForKey(groupKey, value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public Long extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    if (_nullHandlingEnabled) {
+      return aggregationResultHolder.getResult();
+    }
+    return aggregationResultHolder.getLongResult();
+  }
+
+  @Override
+  public Long extractGroupByResult(GroupByResultHolder groupByResultHolder, 
int groupKey) {
+    if (_nullHandlingEnabled) {
+      return groupByResultHolder.getResult(groupKey);
+    }
+    return groupByResultHolder.getLongResult(groupKey);
+  }
+
+  @Override
+  public Long merge(Long intermediateMaxResult1, Long intermediateMaxResult2) {
+    if (_nullHandlingEnabled) {
+      if (intermediateMaxResult1 == null) {
+        return intermediateMaxResult2;
+      }
+      if (intermediateMaxResult2 == null) {
+        return intermediateMaxResult1;
+      }
+    }
+
+    if (intermediateMaxResult1 > intermediateMaxResult2) {
+      return intermediateMaxResult1;
+    }
+    return intermediateMaxResult2;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Long extractFinalResult(Long intermediateResult) {
+    return intermediateResult;
+  }
+
+  @Override
+  public Long mergeFinalResult(Long finalResult1, Long finalResult2) {
+    return merge(finalResult1, finalResult2);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
new file mode 100644
index 00000000000..c48d222abf8
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunction.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.LongAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.LongGroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class MinLongAggregationFunction extends 
NullableSingleInputAggregationFunction<Long, Long> {
+  protected static final Long DEFAULT_VALUE = Long.MAX_VALUE;
+
+  public MinLongAggregationFunction(List<ExpressionContext> arguments, boolean 
nullHandlingEnabled) {
+    this(verifySingleArgument(arguments, "MINLONG"), nullHandlingEnabled);
+  }
+
+  protected MinLongAggregationFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.MINLONG;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    if (_nullHandlingEnabled) {
+      return new ObjectAggregationResultHolder();
+    }
+    return new LongAggregateResultHolder(DEFAULT_VALUE);
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    if (_nullHandlingEnabled) {
+      return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+    }
+    return new LongGroupByResultHolder(initialCapacity, maxCapacity, 
DEFAULT_VALUE);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    long[] values = blockValSet.getLongValuesSV();
+
+    Long min = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+      long innerMin = values[from];
+      for (int i = from; i < to; i++) {
+        innerMin = Math.min(innerMin, values[i]);
+      }
+      return acum == null ? innerMin : Math.min(acum, innerMin);
+    });
+
+    updateAggregationResultHolder(aggregationResultHolder, min);
+  }
+
+  protected void updateAggregationResultHolder(AggregationResultHolder 
aggregationResultHolder, Long min) {
+    if (min != null) {
+      if (_nullHandlingEnabled) {
+        Long otherMin = aggregationResultHolder.getResult();
+        aggregationResultHolder.setValue(otherMin == null ? min : 
Math.min(min, otherMin));
+      } else {
+        long otherMin = aggregationResultHolder.getLongResult();
+        aggregationResultHolder.setValue(Math.min(min, otherMin));
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    long[] valueArray = blockValSet.getLongValuesSV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          long value = valueArray[i];
+          int groupKey = groupKeyArray[i];
+          Long result = groupByResultHolder.getResult(groupKey);
+          if (result == null || value < result) {
+            groupByResultHolder.setValueForKey(groupKey, value);
+          }
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        long value = valueArray[i];
+        int groupKey = groupKeyArray[i];
+        if (value < groupByResultHolder.getLongResult(groupKey)) {
+          groupByResultHolder.setValueForKey(groupKey, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    long[] valueArray = blockValSet.getLongValuesSV();
+
+    if (_nullHandlingEnabled) {
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          long value = valueArray[i];
+          for (int groupKey : groupKeysArray[i]) {
+            Long result = groupByResultHolder.getResult(groupKey);
+            if (result == null || value < result) {
+              groupByResultHolder.setValueForKey(groupKey, value);
+            }
+          }
+        }
+      });
+    } else {
+      for (int i = 0; i < length; i++) {
+        long value = valueArray[i];
+        for (int groupKey : groupKeysArray[i]) {
+          if (value < groupByResultHolder.getLongResult(groupKey)) {
+            groupByResultHolder.setValueForKey(groupKey, value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public Long extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    if (_nullHandlingEnabled) {
+      return aggregationResultHolder.getResult();
+    }
+    return aggregationResultHolder.getLongResult();
+  }
+
+  @Override
+  public Long extractGroupByResult(GroupByResultHolder groupByResultHolder, 
int groupKey) {
+    if (_nullHandlingEnabled) {
+      return groupByResultHolder.getResult(groupKey);
+    }
+    return groupByResultHolder.getLongResult(groupKey);
+  }
+
+  @Override
+  public Long merge(Long intermediateMinResult1, Long intermediateMinResult2) {
+    if (_nullHandlingEnabled) {
+      if (intermediateMinResult1 == null) {
+        return intermediateMinResult2;
+      }
+      if (intermediateMinResult2 == null) {
+        return intermediateMinResult1;
+      }
+    }
+
+    if (intermediateMinResult1 < intermediateMinResult2) {
+      return intermediateMinResult1;
+    }
+    return intermediateMinResult2;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public Long extractFinalResult(Long intermediateResult) {
+    return intermediateResult;
+  }
+
+  @Override
+  public Long mergeFinalResult(Long finalResult1, Long finalResult2) {
+    return merge(finalResult1, finalResult2);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
new file mode 100644
index 00000000000..5f142acad0a
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunction.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.LongAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.LongGroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Specialized LONG sum aggregation function that avoids type conversion 
overhead.
+ * This function is optimized for LONG columns and uses native LONG arithmetic.
+ *
+ * Performance optimizations:
+ * - Direct LONG arithmetic without DOUBLE conversion
+ * - Vectorized operations for better CPU utilization
+ * - Minimal object allocations
+ * - Optimized for the specific case of LONG column aggregation
+ * - Proper null handling support using foldNotNull and forEachNotNull
+ */
+public class SumLongAggregationFunction extends 
NullableSingleInputAggregationFunction<Long, Long> {
+  public static final String FUNCTION_NAME = "SUMLONG";
+  private static final long DEFAULT_VALUE = 0L;
+
+  public SumLongAggregationFunction(List<ExpressionContext> arguments, boolean 
nullHandlingEnabled) {
+    super(arguments.get(0), nullHandlingEnabled);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMLONG;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    if (_nullHandlingEnabled) {
+      return new ObjectAggregationResultHolder();
+    } else {
+      return new LongAggregateResultHolder(DEFAULT_VALUE);
+    }
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    if (_nullHandlingEnabled) {
+      return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+    } else {
+      return new LongGroupByResultHolder(initialCapacity, maxCapacity, 
DEFAULT_VALUE);
+    }
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.getValueType().getStoredType() != DataType.LONG) {
+      throw new IllegalArgumentException("SumLongAggregationFunction only 
supports LONG columns");
+    }
+
+    long[] values = blockValSet.getLongValuesSV();
+
+    // Use foldNotNull with null as initial value - this will return null if 
no non-null values are processed
+    Long sum = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+      long innerSum = 0;
+      for (int i = from; i < to; i++) {
+        innerSum += values[i];
+      }
+      return acum == null ? innerSum : acum + innerSum;
+    });
+
+    updateAggregationResultHolder(aggregationResultHolder, sum);
+  }
+
+  private void updateAggregationResultHolder(AggregationResultHolder 
aggregationResultHolder, Long sum) {
+    if (sum != null) {
+      if (_nullHandlingEnabled) {
+        Long otherSum = aggregationResultHolder.getResult();
+        aggregationResultHolder.setValue(otherSum == null ? sum : otherSum + 
sum);
+      } else {
+        long otherSum = aggregationResultHolder.getLongResult();
+        aggregationResultHolder.setValue(otherSum + sum);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    long[] values = blockValSet.getLongValuesSV();
+
+    if (_nullHandlingEnabled) {
+      // Use forEachNotNull to handle nulls properly
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          int groupKey = groupKeyArray[i];
+          Long existingResult = groupByResultHolder.getResult(groupKey);
+          groupByResultHolder.setValueForKey(groupKey,
+              (existingResult == null ? values[i] : values[i] + 
existingResult));
+        }
+      });
+    } else {
+      // Process all values when null handling is disabled
+      for (int i = 0; i < length; i++) {
+        int groupKey = groupKeyArray[i];
+        groupByResultHolder.setValueForKey(groupKey, 
groupByResultHolder.getLongResult(groupKey) + values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    long[] values = blockValSet.getLongValuesSV();
+
+    if (_nullHandlingEnabled) {
+      // Use forEachNotNull to handle nulls properly
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          long value = values[i];
+          for (int groupKey : groupKeysArray[i]) {
+            Long existingResult = groupByResultHolder.getResult(groupKey);
+            groupByResultHolder.setValueForKey(groupKey, existingResult == 
null ? value : existingResult + value);
+          }
+        }
+      });
+    } else {
+      // Process all values when null handling is disabled
+      for (int i = 0; i < length; i++) {
+        long value = values[i];
+        for (int groupKey : groupKeysArray[i]) {
+          groupByResultHolder.setValueForKey(groupKey, 
groupByResultHolder.getLongResult(groupKey) + value);
+        }
+      }
+    }
+  }
+
+  @Override
+  public Long extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    if (_nullHandlingEnabled) {
+      return aggregationResultHolder.getResult();
+    }
+    return aggregationResultHolder.getLongResult();
+  }
+
+  @Override
+  public Long extractGroupByResult(GroupByResultHolder groupByResultHolder, 
int groupKey) {
+    if (_nullHandlingEnabled) {
+      return groupByResultHolder.getResult(groupKey);
+    }
+    return groupByResultHolder.getLongResult(groupKey);
+  }
+
+  @Override
+  public Long merge(Long intermediateResult1, Long intermediateResult2) {
+    if (_nullHandlingEnabled) {
+      if (intermediateResult1 == null) {
+        return intermediateResult2;
+      }
+      if (intermediateResult2 == null) {
+        return intermediateResult1;
+      }
+      // Both are non-null
+      return intermediateResult1 + intermediateResult2;
+    } else {
+      long val1 = (intermediateResult1 != null) ? intermediateResult1 : 0L;
+      long val2 = (intermediateResult2 != null) ? intermediateResult2 : 0L;
+      return val1 + val2;
+    }
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public Long extractFinalResult(Long intermediateResult) {
+    return intermediateResult;
+  }
+
+  @Override
+  public Long mergeFinalResult(Long finalResult1, Long finalResult2) {
+    return merge(finalResult1, finalResult2);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
new file mode 100644
index 00000000000..fb0282e5c21
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MaxLongAggregationFunctionTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class MaxLongAggregationFunctionTest extends 
AbstractAggregationFunctionTest {
+
+  @DataProvider(name = "scenarios")
+  Object[] scenarios() {
+    return new Object[] {
+        new DataTypeScenario(FieldSpec.DataType.INT),
+        new DataTypeScenario(FieldSpec.DataType.LONG)
+    };
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationAllNullsWithNullHandlingDisabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select maxlong(myField) from testTable")
+        .thenResultIs("LONG",
+            
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
scenario.getDataType(), null)));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationAllNullsWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select maxlong(myField) from testTable")
+        .thenResultIs("LONG", "null");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVAllNullsWithNullHandlingDisabled(DataTypeScenario 
scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select 'literal', maxlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | "
+            + FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
scenario.getDataType(), null));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVAllNullsWithNullHandlingEnabled(DataTypeScenario 
scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select 'literal', maxlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | null");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationWithNullHandlingDisabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "2",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "5",
+            "null"
+        ).whenQuery("select maxlong(myField) from testTable")
+        .thenResultIs("LONG", "5");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "2",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "5",
+            "null"
+        ).whenQuery("select maxlong(myField) from testTable")
+        .thenResultIs("LONG", "5");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVWithNullHandlingDisabled(DataTypeScenario scenario) 
{
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "2",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "5",
+            "null"
+        ).whenQuery("select 'literal', maxlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | 5");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "3",
+            "null",
+            "5"
+        ).andOnSecondInstance("myField",
+            "null",
+            "null",
+            "null"
+        ).whenQuery("select 'literal', maxlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | 5");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupByMV(DataTypeScenario scenario) {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("tags", FieldSpec.DataType.STRING)
+                .addMetricField("value", scenario.getDataType())
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"tag1;tag2", -1},
+            new Object[]{"tag2;tag3", null}
+        )
+        .andOnSecondInstance(
+            new Object[]{"tag1;tag2", -2},
+            new Object[]{"tag2;tag3", null}
+        )
+        .whenQuery("select tags, maxlong(value) from testTable group by tags 
order by tags")
+        .thenResultIs(
+            "STRING | LONG",
+            "tag1    | -1",
+            "tag2    | 0",
+            "tag3    | 0"
+        )
+        .whenQueryWithNullHandlingEnabled("select tags, maxlong(value) from 
testTable group by tags order by tags")
+        .thenResultIs(
+            "STRING | LONG",
+            "tag1    | -1",
+            "tag2    | -1",
+            "tag3    | null"
+        );
+  }
+
+  @Test
+  public void aggregationOnLargeLongValues() {
+    new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "" + (Long.MAX_VALUE - 6),
+            "" + (Long.MAX_VALUE - 5),
+            "" + (Long.MAX_VALUE - 4)
+        ).andOnSecondInstance("myField",
+            "" + (Long.MAX_VALUE - 3),
+            "" + (Long.MAX_VALUE - 2),
+            "" + (Long.MAX_VALUE - 1)
+        ).whenQuery("select maxlong(myField) from testTable")
+        .thenResultIs("LONG", "" + (Long.MAX_VALUE - 1));
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
new file mode 100644
index 00000000000..cea3320314a
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinLongAggregationFunctionTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class MinLongAggregationFunctionTest extends 
AbstractAggregationFunctionTest {
+
+  @DataProvider(name = "scenarios")
+  Object[] scenarios() {
+    return new Object[] {
+        new DataTypeScenario(FieldSpec.DataType.INT),
+        new DataTypeScenario(FieldSpec.DataType.LONG)
+    };
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationAllNullsWithNullHandlingDisabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select minlong(myField) from testTable")
+        .thenResultIs("LONG",
+            
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
scenario.getDataType(), null)));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationAllNullsWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select minlong(myField) from testTable")
+        .thenResultIs("LONG", "null");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVAllNullsWithNullHandlingDisabled(DataTypeScenario 
scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select 'literal', minlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | "
+            + FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
scenario.getDataType(), null));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVAllNullsWithNullHandlingEnabled(DataTypeScenario 
scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select 'literal', minlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | null");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationWithNullHandlingDisabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "5",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "2",
+            "null"
+        ).whenQuery("select minlong(myField) from testTable")
+        .thenResultIs("LONG",
+            
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
scenario.getDataType(), null)));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "5",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "2",
+            "null"
+        ).whenQuery("select minlong(myField) from testTable")
+        .thenResultIs("LONG", "2");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVWithNullHandlingDisabled(DataTypeScenario scenario) 
{
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "5",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "2",
+            "null"
+        ).whenQuery("select 'literal', minlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | "
+            + FieldSpec.getDefaultNullValue(FieldSpec.FieldType.DIMENSION, 
scenario.getDataType(), null));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "5",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "null",
+            "null"
+        ).whenQuery("select 'literal', minlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | 3");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupByMV(DataTypeScenario scenario) {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("tags", FieldSpec.DataType.STRING)
+                .addMetricField("value", scenario.getDataType())
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"tag1;tag2", 1},
+            new Object[]{"tag2;tag3", null}
+        )
+        .andOnSecondInstance(
+            new Object[]{"tag1;tag2", 2},
+            new Object[]{"tag2;tag3", null}
+        )
+        .whenQuery("select tags, minlong(value) from testTable group by tags 
order by tags")
+        .thenResultIs(
+            "STRING | LONG",
+            "tag1    | 1",
+            "tag2    | 0",
+            "tag3    | 0"
+        )
+        .whenQueryWithNullHandlingEnabled("select tags, minlong(value) from 
testTable group by tags order by tags")
+        .thenResultIs(
+            "STRING | LONG",
+            "tag1    | 1",
+            "tag2    | 1",
+            "tag3    | null"
+        );
+  }
+
+  @Test
+  public void aggregationOnLargeLongValues() {
+    new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "" + Long.MAX_VALUE,
+            "" + (Long.MAX_VALUE - 1),
+            "" + (Long.MAX_VALUE - 2)
+        ).andOnSecondInstance("myField",
+            "" + (Long.MAX_VALUE - 3),
+            "" + (Long.MAX_VALUE - 4),
+            "" + (Long.MAX_VALUE - 5)
+        ).whenQuery("select minlong(myField) from testTable")
+        .thenResultIs("LONG", "" + (Long.MAX_VALUE - 5));
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
new file mode 100644
index 00000000000..5bc868b225c
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/SumLongAggregationFunctionTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class SumLongAggregationFunctionTest extends 
AbstractAggregationFunctionTest {
+
+  @DataProvider(name = "scenarios")
+  Object[] scenarios() {
+    return new Object[]{
+        new DataTypeScenario(FieldSpec.DataType.LONG)
+    };
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationAllNullsWithNullHandlingDisabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(false, FieldSpec.FieldType.METRIC)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select sumlong(myField) from testTable")
+        .thenResultIs("LONG",
+            
String.valueOf(FieldSpec.getDefaultNullValue(FieldSpec.FieldType.METRIC, 
scenario.getDataType(), null)));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationAllNullsWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true, FieldSpec.FieldType.METRIC)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select sumlong(myField) from testTable")
+        .thenResultIs("LONG", "null");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVAllNullsWithNullHandlingDisabled(DataTypeScenario 
scenario) {
+    scenario.getDeclaringTable(false, FieldSpec.FieldType.METRIC)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select 'literal', sumlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | "
+            + FieldSpec.getDefaultNullValue(FieldSpec.FieldType.METRIC, 
scenario.getDataType(), null));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVAllNullsWithNullHandlingEnabled(DataTypeScenario 
scenario) {
+    scenario.getDeclaringTable(true, FieldSpec.FieldType.METRIC)
+        .onFirstInstance("myField",
+            "null",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null"
+        ).whenQuery("select 'literal', sumlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | null");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationWithNullHandlingDisabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(false, FieldSpec.FieldType.METRIC)
+        .onFirstInstance("myField",
+            "3",
+            "null",
+            "5"
+        ).andOnSecondInstance("myField",
+            "null",
+            "null"
+        ).whenQuery("select sumlong(myField) from testTable")
+        .thenResultIs("LONG", "8");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true, FieldSpec.FieldType.METRIC)
+        .onFirstInstance("myField",
+            "null",
+            "5",
+            "null"
+        ).andOnSecondInstance("myField",
+            "2",
+            "null",
+            "3"
+        ).whenQuery("select sumlong(myField) from testTable")
+        .thenResultIs("LONG", "10");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVWithNullHandlingDisabled(DataTypeScenario scenario) 
{
+    scenario.getDeclaringTable(false, FieldSpec.FieldType.METRIC)
+        .onFirstInstance("myField",
+            "5",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "2",
+            "null"
+        ).whenQuery("select 'literal', sumlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | 10");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVWithNullHandlingEnabled(DataTypeScenario scenario) {
+    scenario.getDeclaringTable(true, FieldSpec.FieldType.METRIC)
+        .onFirstInstance("myField",
+            "5",
+            "null",
+            "3"
+        ).andOnSecondInstance("myField",
+            "null",
+            "null",
+            "null"
+        ).whenQuery("select 'literal', sumlong(myField) from testTable group 
by 'literal'")
+        .thenResultIs("STRING | LONG", "literal | 8");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupByMV(DataTypeScenario scenario) {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .givenTable(
+            new Schema.SchemaBuilder()
+                .setSchemaName("testTable")
+                .setEnableColumnBasedNullHandling(true)
+                .addMultiValueDimension("tags", FieldSpec.DataType.STRING)
+                .addSingleValueDimension("value", scenario.getDataType(), -1)
+                .build(), SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{"tag1;tag2", 1},
+            new Object[]{"tag2;tag3", null}
+        )
+        .andOnSecondInstance(
+            new Object[]{"tag1;tag2", 2},
+            new Object[]{"tag2;tag3", null}
+        )
+        .whenQuery("select tags, sumlong(value) from testTable group by tags 
order by tags")
+        .thenResultIs(
+            "STRING | LONG",
+            "tag1    | 3",
+            "tag2    | 1",
+            "tag3    | -2"
+        )
+        .whenQueryWithNullHandlingEnabled("select tags, sumlong(value) from 
testTable group by tags order by tags")
+        .thenResultIs(
+            "STRING | LONG",
+            "tag1    | 3",
+            "tag2    | 3",
+            "tag3    | null"
+        );
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 0018a5ea10b..c019e5c3aa5 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3594,6 +3594,12 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     testNonScanAggregationQuery(query);
     query = "SELECT MAX(AirlineID) FROM " + tableName;
     testNonScanAggregationQuery(query);
+    query = "SELECT MINLONG(AirlineID) FROM " + tableName;
+    h2Query = "SELECT MIN(AirlineID) FROM " + tableName;
+    testNonScanAggregationQuery(query, h2Query);
+    query = "SELECT MAXLONG(AirlineID) FROM " + tableName;
+    h2Query = "SELECT MAX(AirlineID) FROM " + tableName;
+    testNonScanAggregationQuery(query, h2Query);
     query = "SELECT MIN_MAX_RANGE(AirlineID) FROM " + tableName;
     h2Query = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + tableName;
     testNonScanAggregationQuery(query, h2Query);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 7cddf5008c5..614d8650749 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -55,9 +55,12 @@ public enum AggregationFunctionType {
   MAX("max", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
   MINSTRING("minString", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, 
OperandTypes.CHARACTER),
   MAXSTRING("maxString", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, 
OperandTypes.CHARACTER),
+  MINLONG("minLong", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, OperandTypes.INTEGER),
+  MAXLONG("maxLong", ReturnTypes.ARG0_NULLABLE_IF_EMPTY, OperandTypes.INTEGER),
   SUM("sum", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
   SUM0("$sum0", SqlTypeName.DOUBLE, SqlTypeName.DOUBLE),
-  SUMINT("sumInt", ReturnTypes.BIGINT, OperandTypes.INTEGER),
+  SUMINT("sumInt", ReturnTypes.AGG_SUM, OperandTypes.INTEGER),
+  SUMLONG("sumLong", ReturnTypes.AGG_SUM, OperandTypes.INTEGER),
   SUMPRECISION("sumPrecision", ReturnTypes.explicit(SqlTypeName.DECIMAL), 
OperandTypes.ANY, SqlTypeName.OTHER),
   AVG("avg", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
   MODE("mode", SqlTypeName.OTHER, SqlTypeName.DOUBLE),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to