This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cb3ccdcaee First with time (#12235)
cb3ccdcaee is described below

commit cb3ccdcaeec4387adced4e7210435ec6260807c7
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Wed Mar 20 00:35:42 2024 -0700

    First with time (#12235)
    
    * Support nullHandling in FirstWithTime
    
    * Add last_with_time as well
---
 .../function/AggregationFunctionFactory.java       |  26 +--
 ...irstDoubleValueWithTimeAggregationFunction.java |  55 +++---
 ...FirstFloatValueWithTimeAggregationFunction.java |  55 +++---
 .../FirstIntValueWithTimeAggregationFunction.java  |  54 +++---
 .../FirstLongValueWithTimeAggregationFunction.java |  55 +++---
 ...irstStringValueWithTimeAggregationFunction.java |  55 +++---
 .../function/FirstWithTimeAggregationFunction.java |  94 +++++++----
 ...LastDoubleValueWithTimeAggregationFunction.java |  55 +++---
 .../LastFloatValueWithTimeAggregationFunction.java |  55 +++---
 .../LastIntValueWithTimeAggregationFunction.java   |  54 +++---
 .../LastLongValueWithTimeAggregationFunction.java  |  55 +++---
 ...LastStringValueWithTimeAggregationFunction.java |  55 +++---
 .../function/LastWithTimeAggregationFunction.java  |  34 +++-
 .../NullableSingleInputAggregationFunction.java    | 113 +++++++++++++
 .../FirstWithTimeAggregationFunctionTest.java      | 186 +++++++++++++++++++++
 .../LastWithTimeAggregationFunctionTest.java       | 174 +++++++++++++++++++
 .../segment/local/customobject/ValueLongPair.java  |   8 +
 17 files changed, 831 insertions(+), 352 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 3c449f1578..eeed8608a4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -221,17 +221,19 @@ public class AggregationFunctionFactory {
             DataType dataType = 
DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
             switch (dataType) {
               case BOOLEAN:
-                return new 
FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, true);
+                return new 
FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled,
+                    true);
               case INT:
-                return new 
FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, false);
+                return new 
FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled,
+                    false);
               case LONG:
-                return new 
FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol);
+                return new 
FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled);
               case FLOAT:
-                return new 
FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
+                return new 
FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled);
               case DOUBLE:
-                return new 
FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
+                return new 
FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled);
               case STRING:
-                return new 
FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol);
+                return new 
FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled);
               default:
                 throw new IllegalArgumentException("Unsupported data type for 
FIRST_WITH_TIME: " + dataType);
             }
@@ -300,17 +302,17 @@ public class AggregationFunctionFactory {
             DataType dataType = 
DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
             switch (dataType) {
               case BOOLEAN:
-                return new 
LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, true);
+                return new 
LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled, true);
               case INT:
-                return new 
LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, false);
+                return new 
LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled, false);
               case LONG:
-                return new 
LastLongValueWithTimeAggregationFunction(firstArgument, timeCol);
+                return new 
LastLongValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled);
               case FLOAT:
-                return new 
LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
+                return new 
LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled);
               case DOUBLE:
-                return new 
LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
+                return new 
LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled);
               case STRING:
-                return new 
LastStringValueWithTimeAggregationFunction(firstArgument, timeCol);
+                return new 
LastStringValueWithTimeAggregationFunction(firstArgument, timeCol, 
nullHandlingEnabled);
               default:
                 throw new IllegalArgumentException("Unsupported data type for 
LAST_WITH_TIME: " + dataType);
             }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
index 270a86d671..46d61f225d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.DoubleLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -41,8 +41,9 @@ import 
org.apache.pinot.segment.local.customobject.ValueLongPair;
 public class FirstDoubleValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<Double> {
   private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR = new 
DoubleLongPair(Double.NaN, Long.MAX_VALUE);
 
-  public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext 
dataCol, ExpressionContext timeCol) {
-    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext 
dataCol, ExpressionContext timeCol,
+      boolean nullHandlingEnabled) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
   }
 
   @Override
@@ -56,22 +57,8 @@ public class FirstDoubleValueWithTimeAggregationFunction 
extends FirstWithTimeAg
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
-    Double firstData = defaultValueLongPair.getValue();
-    long firstTime = defaultValueLongPair.getTime();
-    double[] doubleValues = blockValSet.getDoubleValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      double data = doubleValues[i];
-      long time = timeValues[i];
-      if (time <= firstTime) {
-        firstTime = time;
-        firstData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  public Double readCell(BlockValSet block, int docId) {
+    return block.getDoubleValuesSV()[docId];
   }
 
   @Override
@@ -79,11 +66,15 @@ public class FirstDoubleValueWithTimeAggregationFunction 
extends FirstWithTimeAg
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     double[] doubleValues = blockValSet.getDoubleValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      double data = doubleValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        double data = doubleValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -91,13 +82,17 @@ public class FirstDoubleValueWithTimeAggregationFunction 
extends FirstWithTimeAg
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     double[] doubleValues = blockValSet.getDoubleValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      double value = doubleValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        double value = doubleValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
index e4c4da2400..d34fe755ec 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.FloatLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -41,8 +41,9 @@ import 
org.apache.pinot.segment.local.customobject.ValueLongPair;
 public class FirstFloatValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<Float> {
   private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new 
FloatLongPair(Float.NaN, Long.MAX_VALUE);
 
-  public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol) {
-    super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
+  public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
+      boolean nullHandlingEnabled) {
+    super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
   }
 
   @Override
@@ -56,22 +57,8 @@ public class FirstFloatValueWithTimeAggregationFunction 
extends FirstWithTimeAgg
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
-    Float firstData = defaultValueLongPair.getValue();
-    long firstTime = defaultValueLongPair.getTime();
-    float[] floatValues = blockValSet.getFloatValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      float data = floatValues[i];
-      long time = timeValues[i];
-      if (time <= firstTime) {
-        firstTime = time;
-        firstData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  public Float readCell(BlockValSet block, int docId) {
+    return block.getFloatValuesSV()[docId];
   }
 
   @Override
@@ -79,11 +66,15 @@ public class FirstFloatValueWithTimeAggregationFunction 
extends FirstWithTimeAgg
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     float[] floatValues = blockValSet.getFloatValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      float data = floatValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        float data = floatValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -91,13 +82,17 @@ public class FirstFloatValueWithTimeAggregationFunction 
extends FirstWithTimeAgg
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     float[] floatValues = blockValSet.getFloatValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      float value = floatValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        float value = floatValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
index 8623f0bdc5..97b5829968 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.IntLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -46,8 +46,8 @@ public class FirstIntValueWithTimeAggregationFunction extends 
FirstWithTimeAggre
   private final boolean _isBoolean;
 
   public FirstIntValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
-      boolean isBoolean) {
-    super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
+      boolean nullHandlingEnabled, boolean isBoolean) {
+    super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
     _isBoolean = isBoolean;
   }
 
@@ -62,22 +62,8 @@ public class FirstIntValueWithTimeAggregationFunction 
extends FirstWithTimeAggre
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
-    Integer firstData = defaultValueLongPair.getValue();
-    long firstTime = defaultValueLongPair.getTime();
-    int[] intValues = blockValSet.getIntValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      int data = intValues[i];
-      long time = timeValues[i];
-      if (time <= firstTime) {
-        firstTime = time;
-        firstData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  public Integer readCell(BlockValSet block, int docId) {
+    return block.getIntValuesSV()[docId];
   }
 
   @Override
@@ -85,11 +71,15 @@ public class FirstIntValueWithTimeAggregationFunction 
extends FirstWithTimeAggre
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     int[] intValues = blockValSet.getIntValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      int data = intValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        int data = intValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -97,13 +87,17 @@ public class FirstIntValueWithTimeAggregationFunction 
extends FirstWithTimeAggre
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     int[] intValues = blockValSet.getIntValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      int value = intValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        int value = intValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
index 0f5afeaa97..9b1fd34d2a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.LongLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -41,8 +41,9 @@ import 
org.apache.pinot.segment.local.customobject.ValueLongPair;
 public class FirstLongValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<Long> {
   private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR = new 
LongLongPair(Long.MIN_VALUE, Long.MAX_VALUE);
 
-  public FirstLongValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol) {
-    super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE);
+  public FirstLongValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
+      boolean nullHandlingEnabled) {
+    super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
   }
 
   @Override
@@ -56,22 +57,8 @@ public class FirstLongValueWithTimeAggregationFunction 
extends FirstWithTimeAggr
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair();
-    Long firstData = defaultValueLongPair.getValue();
-    long firstTime = defaultValueLongPair.getTime();
-    long[] longValues = blockValSet.getLongValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      long data = longValues[i];
-      long time = timeValues[i];
-      if (time <= firstTime) {
-        firstTime = time;
-        firstData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  public Long readCell(BlockValSet block, int docId) {
+    return block.getLongValuesSV()[docId];
   }
 
   @Override
@@ -79,11 +66,15 @@ public class FirstLongValueWithTimeAggregationFunction 
extends FirstWithTimeAggr
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     long[] longValues = blockValSet.getLongValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      long data = longValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        long data = longValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -91,13 +82,17 @@ public class FirstLongValueWithTimeAggregationFunction 
extends FirstWithTimeAggr
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     long[] longValues = blockValSet.getLongValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      long value = longValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        long value = longValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
index c30f4bfacb..4fe34117d2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -41,8 +41,9 @@ import 
org.apache.pinot.segment.local.customobject.ValueLongPair;
 public class FirstStringValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<String> {
   private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new 
StringLongPair("", Long.MAX_VALUE);
 
-  public FirstStringValueWithTimeAggregationFunction(ExpressionContext 
dataCol, ExpressionContext timeCol) {
-    super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE);
+  public FirstStringValueWithTimeAggregationFunction(ExpressionContext 
dataCol, ExpressionContext timeCol,
+      boolean nullHandlingEnabled) {
+    super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
   }
 
   @Override
@@ -56,22 +57,8 @@ public class FirstStringValueWithTimeAggregationFunction 
extends FirstWithTimeAg
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair();
-    String firstData = defaultValueLongPair.getValue();
-    long firstTime = defaultValueLongPair.getTime();
-    String[] stringValues = blockValSet.getStringValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      String data = stringValues[i];
-      long time = timeValues[i];
-      if (time <= firstTime) {
-        firstTime = time;
-        firstData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  public String readCell(BlockValSet block, int docId) {
+    return block.getStringValuesSV()[docId];
   }
 
   @Override
@@ -79,11 +66,15 @@ public class FirstStringValueWithTimeAggregationFunction 
extends FirstWithTimeAg
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     String[] stringValues = blockValSet.getStringValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      String data = stringValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        String data = stringValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -91,13 +82,17 @@ public class FirstStringValueWithTimeAggregationFunction 
extends FirstWithTimeAg
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     String[] stringValues = blockValSet.getStringValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      String value = stringValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        String value = stringValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
index 7049d0f1db..3d04803fde 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
@@ -29,9 +29,11 @@ import 
org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -47,14 +49,13 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>>
-    extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+    extends NullableSingleInputAggregationFunction<ValueLongPair<V>, V> {
   protected final ExpressionContext _timeCol;
   private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> 
_objectSerDe;
 
-  public FirstWithTimeAggregationFunction(ExpressionContext dataCol,
-      ExpressionContext timeCol,
-      ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
-    super(dataCol);
+  public FirstWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
+      ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe, 
boolean nullHandlingEnabled) {
+    super(dataCol, nullHandlingEnabled);
     _timeCol = timeCol;
     _objectSerDe = objectSerDe;
   }
@@ -63,8 +64,7 @@ public abstract class FirstWithTimeAggregationFunction<V 
extends Comparable<V>>
 
   public abstract ValueLongPair<V> getDefaultValueTimePair();
 
-  public abstract void aggregateResultWithRawData(int length, 
AggregationResultHolder aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet);
+  public abstract V readCell(BlockValSet block, int docId);
 
   public abstract void aggregateGroupResultWithRawDataSv(int length,
       int[] groupKeyArray,
@@ -100,23 +100,45 @@ public abstract class FirstWithTimeAggregationFunction<V 
extends Comparable<V>>
     BlockValSet blockValSet = blockValSetMap.get(_expression);
     BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
     if (blockValSet.getValueType() != DataType.BYTES) {
-      aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, 
blockTimeSet);
+      IntLongPair defaultPair = new IntLongPair(Integer.MIN_VALUE, 
Long.MAX_VALUE);
+      long[] timeValues = blockTimeSet.getLongValuesSV();
+
+      IntIterator nullIdxIterator = orNullIterator(blockValSet, blockTimeSet);
+      IntLongPair bestPair = foldNotNull(length, nullIdxIterator, defaultPair, 
(pair, from, to) -> {
+        IntLongPair actualPair = pair;
+        for (int i = from; i < to; i++) {
+          long time = timeValues[i];
+          if (time <= actualPair.getTime()) {
+            actualPair = new IntLongPair(i, time);
+          }
+        }
+        return actualPair;
+      });
+      V bestValue;
+      if (bestPair.getValue() < 0) {
+        bestValue = getDefaultValueTimePair().getValue();
+      } else {
+        bestValue = readCell(blockValSet, bestPair.getValue());
+      }
+      setAggregationResult(aggregationResultHolder, bestValue, 
bestPair.getTime());
     } else {
+      // We assume bytes contain the binary serialization of FirstPair
       ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
-      V firstData = defaultValueLongPair.getValue();
-      long firstTime = defaultValueLongPair.getTime();
-      // Serialized FirstPair
+
+      ValueLongPair<V> result = 
constructValueLongPair(defaultValueLongPair.getValue(), 
defaultValueLongPair.getTime());
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
-      for (int i = 0; i < length; i++) {
-        ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
-        V data = firstWithTimePair.getValue();
-        long time = firstWithTimePair.getTime();
-        if (time <= firstTime) {
-          firstTime = time;
-          firstData = data;
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
+          long time = firstWithTimePair.getTime();
+          if (time < result.getTime()) {
+            result.setTime(time);
+            result.setValue(firstWithTimePair.getValue());
+          }
         }
-      }
-      setAggregationResult(aggregationResultHolder, firstData, firstTime);
+      });
+
+      setAggregationResult(aggregationResultHolder, result.getValue(), 
result.getTime());
     }
   }
 
@@ -138,13 +160,15 @@ public abstract class FirstWithTimeAggregationFunction<V 
extends Comparable<V>>
     } else {
       // Serialized FirstPair
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
-      for (int i = 0; i < length; i++) {
-        ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
-        setGroupByResult(groupKeyArray[i],
-            groupByResultHolder,
-            firstWithTimePair.getValue(),
-            firstWithTimePair.getTime());
-      }
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
+          setGroupByResult(groupKeyArray[i],
+              groupByResultHolder,
+              firstWithTimePair.getValue(),
+              firstWithTimePair.getTime());
+        }
+      });
     }
   }
 
@@ -158,14 +182,16 @@ public abstract class FirstWithTimeAggregationFunction<V 
extends Comparable<V>>
     } else {
       // Serialized ValueTimePair
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
-      for (int i = 0; i < length; i++) {
-        ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
-        V data = firstWithTimePair.getValue();
-        long time = firstWithTimePair.getTime();
-        for (int groupKey : groupKeysArray[i]) {
-          setGroupByResult(groupKey, groupByResultHolder, data, time);
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
+          V data = firstWithTimePair.getValue();
+          long time = firstWithTimePair.getTime();
+          for (int groupKey : groupKeysArray[i]) {
+            setGroupByResult(groupKey, groupByResultHolder, data, time);
+          }
         }
-      }
+      });
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
index 93aec2b0c1..398455c799 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.DoubleLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -41,8 +41,9 @@ import 
org.apache.pinot.segment.local.customobject.ValueLongPair;
 public class LastDoubleValueWithTimeAggregationFunction extends 
LastWithTimeAggregationFunction<Double> {
   private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR = new 
DoubleLongPair(Double.NaN, Long.MIN_VALUE);
 
-  public LastDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol) {
-    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  public LastDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
+      boolean nullHandlingEnabled) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
   }
 
   @Override
@@ -56,22 +57,8 @@ public class LastDoubleValueWithTimeAggregationFunction 
extends LastWithTimeAggr
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
-    Double lastData = defaultValueLongPair.getValue();
-    long lastTime = defaultValueLongPair.getTime();
-    double[] doubleValues = blockValSet.getDoubleValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      double data = doubleValues[i];
-      long time = timeValues[i];
-      if (time >= lastTime) {
-        lastTime = time;
-        lastData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  public Double readCell(BlockValSet block, int docId) {
+    return block.getDoubleValuesSV()[docId];
   }
 
   @Override
@@ -79,11 +66,15 @@ public class LastDoubleValueWithTimeAggregationFunction 
extends LastWithTimeAggr
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     double[] doubleValues = blockValSet.getDoubleValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      double data = doubleValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        double data = doubleValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -91,13 +82,17 @@ public class LastDoubleValueWithTimeAggregationFunction 
extends LastWithTimeAggr
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     double[] doubleValues = blockValSet.getDoubleValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      double value = doubleValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        double value = doubleValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
index 50635874c9..6d7eaaa172 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.FloatLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -41,8 +41,9 @@ import 
org.apache.pinot.segment.local.customobject.ValueLongPair;
 public class LastFloatValueWithTimeAggregationFunction extends 
LastWithTimeAggregationFunction<Float> {
   private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new 
FloatLongPair(Float.NaN, Long.MIN_VALUE);
 
-  public LastFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol) {
-    super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
+  public LastFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
+      boolean nullHandlingEnabled) {
+    super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
   }
 
   @Override
@@ -56,22 +57,8 @@ public class LastFloatValueWithTimeAggregationFunction 
extends LastWithTimeAggre
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
-    Float lastData = defaultValueLongPair.getValue();
-    long lastTime = defaultValueLongPair.getTime();
-    float[] floatValues = blockValSet.getFloatValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      float data = floatValues[i];
-      long time = timeValues[i];
-      if (time >= lastTime) {
-        lastTime = time;
-        lastData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  public Float readCell(BlockValSet block, int docId) {
+    return block.getFloatValuesSV()[docId];
   }
 
   @Override
@@ -79,11 +66,15 @@ public class LastFloatValueWithTimeAggregationFunction 
extends LastWithTimeAggre
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     float[] floatValues = blockValSet.getFloatValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      float data = floatValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        float data = floatValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -91,13 +82,17 @@ public class LastFloatValueWithTimeAggregationFunction 
extends LastWithTimeAggre
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     float[] floatValues = blockValSet.getFloatValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      float value = floatValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        float value = floatValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
index 87d33ce0df..e124f58d0e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.IntLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -46,8 +46,8 @@ public class LastIntValueWithTimeAggregationFunction extends 
LastWithTimeAggrega
   private final boolean _isBoolean;
 
   public LastIntValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
-      boolean isBoolean) {
-    super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
+      boolean nullHandlingEnabled, boolean isBoolean) {
+    super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
     _isBoolean = isBoolean;
   }
 
@@ -62,22 +62,8 @@ public class LastIntValueWithTimeAggregationFunction extends 
LastWithTimeAggrega
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
-    Integer lastData = defaultValueLongPair.getValue();
-    long lastTime = defaultValueLongPair.getTime();
-    int[] intValues = blockValSet.getIntValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      int data = intValues[i];
-      long time = timeValues[i];
-      if (time >= lastTime) {
-        lastTime = time;
-        lastData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  public Integer readCell(BlockValSet block, int docId) {
+    return block.getIntValuesSV()[docId];
   }
 
   @Override
@@ -85,11 +71,15 @@ public class LastIntValueWithTimeAggregationFunction 
extends LastWithTimeAggrega
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     int[] intValues = blockValSet.getIntValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      int data = intValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        int data = intValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -97,13 +87,17 @@ public class LastIntValueWithTimeAggregationFunction 
extends LastWithTimeAggrega
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     int[] intValues = blockValSet.getIntValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      int value = intValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        int value = intValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
index 248487d635..6c0e08e761 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.LongLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -41,8 +41,9 @@ import 
org.apache.pinot.segment.local.customobject.ValueLongPair;
 public class LastLongValueWithTimeAggregationFunction extends 
LastWithTimeAggregationFunction<Long> {
   private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR = new 
LongLongPair(Long.MIN_VALUE, Long.MIN_VALUE);
 
-  public LastLongValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol) {
-    super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE);
+  public LastLongValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
+      boolean nullHandlingEnabled) {
+    super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
   }
 
   @Override
@@ -56,22 +57,8 @@ public class LastLongValueWithTimeAggregationFunction 
extends LastWithTimeAggreg
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair();
-    Long lastData = defaultValueLongPair.getValue();
-    long lastTime = defaultValueLongPair.getTime();
-    long[] longValues = blockValSet.getLongValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      long data = longValues[i];
-      long time = timeValues[i];
-      if (time >= lastTime) {
-        lastTime = time;
-        lastData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  public Long readCell(BlockValSet block, int docId) {
+    return block.getLongValuesSV()[docId];
   }
 
   @Override
@@ -79,11 +66,15 @@ public class LastLongValueWithTimeAggregationFunction 
extends LastWithTimeAggreg
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     long[] longValues = blockValSet.getLongValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      long data = longValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        long data = longValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -91,13 +82,17 @@ public class LastLongValueWithTimeAggregationFunction 
extends LastWithTimeAggreg
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     long[] longValues = blockValSet.getLongValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      long value = longValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        long value = longValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
index 0f10e9399d..2b200b157a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -41,8 +41,9 @@ import 
org.apache.pinot.segment.local.customobject.ValueLongPair;
 public class LastStringValueWithTimeAggregationFunction extends 
LastWithTimeAggregationFunction<String> {
   private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new 
StringLongPair("", Long.MIN_VALUE);
 
-  public LastStringValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol) {
-    super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE);
+  public LastStringValueWithTimeAggregationFunction(ExpressionContext dataCol, 
ExpressionContext timeCol,
+      boolean nullHandlingEnabled) {
+    super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE, 
nullHandlingEnabled);
   }
 
   @Override
@@ -56,22 +57,8 @@ public class LastStringValueWithTimeAggregationFunction 
extends LastWithTimeAggr
   }
 
   @Override
-  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet) {
-    ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair();
-    String lastData = defaultValueLongPair.getValue();
-    long lastTime = defaultValueLongPair.getTime();
-    String[] stringValues = blockValSet.getStringValuesSV();
-    long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      String data = stringValues[i];
-      long time = timeValues[i];
-      if (time >= lastTime) {
-        lastTime = time;
-        lastData = data;
-      }
-    }
-    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  public String readCell(BlockValSet block, int docId) {
+    return block.getStringValuesSV()[docId];
   }
 
   @Override
@@ -79,11 +66,15 @@ public class LastStringValueWithTimeAggregationFunction 
extends LastWithTimeAggr
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     String[] stringValues = blockValSet.getStringValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      String data = stringValues[i];
-      long time = timeValues[i];
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
-    }
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        String data = stringValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    });
   }
 
   @Override
@@ -91,13 +82,17 @@ public class LastStringValueWithTimeAggregationFunction 
extends LastWithTimeAggr
       GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
BlockValSet timeValSet) {
     String[] stringValues = blockValSet.getStringValuesSV();
     long[] timeValues = timeValSet.getLongValuesSV();
-    for (int i = 0; i < length; i++) {
-      String value = stringValues[i];
-      long time = timeValues[i];
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+    IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+    forEachNotNull(length, nullIdxIterator, (from, to) -> {
+      for (int i = from; i < to; i++) {
+        String value = stringValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
index cd4b0576a6..565445ae6f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
@@ -29,9 +29,11 @@ import 
org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.IntIterator;
 
 
 /**
@@ -47,14 +49,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
-    extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+    extends NullableSingleInputAggregationFunction<ValueLongPair<V>, V> {
   protected final ExpressionContext _timeCol;
   private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> 
_objectSerDe;
 
   public LastWithTimeAggregationFunction(ExpressionContext dataCol,
       ExpressionContext timeCol,
-      ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
-    super(dataCol);
+      ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe, 
boolean nullHandlingEnabled) {
+    super(dataCol, nullHandlingEnabled);
     _timeCol = timeCol;
     _objectSerDe = objectSerDe;
   }
@@ -63,8 +65,7 @@ public abstract class LastWithTimeAggregationFunction<V 
extends Comparable<V>>
 
   public abstract ValueLongPair<V> getDefaultValueTimePair();
 
-  public abstract void aggregateResultWithRawData(int length, 
AggregationResultHolder aggregationResultHolder,
-      BlockValSet blockValSet, BlockValSet timeValSet);
+  public abstract V readCell(BlockValSet block, int docId);
 
   public abstract void aggregateGroupResultWithRawDataSv(int length,
       int[] groupKeyArray,
@@ -100,8 +101,29 @@ public abstract class LastWithTimeAggregationFunction<V 
extends Comparable<V>>
     BlockValSet blockValSet = blockValSetMap.get(_expression);
     BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
     if (blockValSet.getValueType() != DataType.BYTES) {
-      aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, 
blockTimeSet);
+      IntLongPair defaultPair = new IntLongPair(Integer.MIN_VALUE, 
Long.MIN_VALUE);
+      long[] timeValues = blockTimeSet.getLongValuesSV();
+
+      IntIterator nullIdxIterator = orNullIterator(blockValSet, blockTimeSet);
+      IntLongPair bestPair = foldNotNull(length, nullIdxIterator, defaultPair, 
(pair, from, to) -> {
+        IntLongPair actualPair = pair;
+        for (int i = from; i < to; i++) {
+          long time = timeValues[i];
+          if (time >= actualPair.getTime()) {
+            actualPair = new IntLongPair(i, time);
+          }
+        }
+        return actualPair;
+      });
+      V bestValue;
+      if (bestPair.getValue() < 0) {
+        bestValue = getDefaultValueTimePair().getValue();
+      } else {
+        bestValue = readCell(blockValSet, bestPair.getValue());
+      }
+      setAggregationResult(aggregationResultHolder, bestValue, 
bestPair.getTime());
     } else {
+      // We assume bytes contain the binary serialization of FirstPair
       ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
       V lastData = defaultValueLongPair.getValue();
       long lastTime = defaultValueLongPair.getTime();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java
index 0a42db7442..78f1ae1269 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.common.BlockValSet;
@@ -138,4 +139,116 @@ public abstract class 
NullableSingleInputAggregationFunction<I, F extends Compar
     }
     return acum;
   }
+
+  public IntIterator orNullIterator(BlockValSet valSet1, BlockValSet valSet2) {
+    if (!_nullHandlingEnabled) {
+      return EmptyIntIterator.INSTANCE;
+    } else {
+      RoaringBitmap nullBlock1 = valSet1.getNullBitmap();
+      RoaringBitmap nullBlock2 = valSet2.getNullBitmap();
+      if (nullBlock1 == null) {
+        return nullBlock2 == null ? EmptyIntIterator.INSTANCE : 
nullBlock2.getIntIterator();
+      } else if (nullBlock2 == null) {
+        return nullBlock1.getIntIterator();
+      } else {
+        return new MinIntIterator(nullBlock1.getIntIterator(), 
nullBlock2.getIntIterator());
+      }
+    }
+  }
+
+  public static class EmptyIntIterator implements IntIterator {
+
+    public static final EmptyIntIterator INSTANCE = new EmptyIntIterator();
+
+    private EmptyIntIterator() {
+    }
+
+    @Override
+    public IntIterator clone() {
+      return this;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return false;
+    }
+
+    @Override
+    public int next() {
+      throw new NoSuchElementException();
+    }
+  }
+
+  public static class MinIntIterator implements IntIterator {
+    private final IntIterator _it1;
+    private final IntIterator _it2;
+    private int _next1 = -1;
+    private int _next2 = -1;
+
+    /**
+     * @param it1 it has to iterate in ascending order and the min value is 0
+     * @param it2 it has to iterate in ascending order and the min value is 0
+     */
+    public MinIntIterator(IntIterator it1, IntIterator it2) {
+      _it1 = it1;
+      _it2 = it2;
+    }
+
+    @Override
+    public IntIterator clone() {
+      return new MinIntIterator(_it1.clone(), _it2.clone());
+    }
+
+    @Override
+    public boolean hasNext() {
+      return _next1 > 0 || _next2 > 0 || _it1.hasNext() || _it2.hasNext();
+    }
+
+    @Override
+    public int next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (_next1 < 0) {
+        if (_it1.hasNext()) {
+          _next1 = _it1.next();
+        } else { //it1 is completely consumed
+          if (_next2 >= 0) { // consume the last cached value
+            return consume2();
+          } else { // after that, return all values from it2
+            return _it2.next();
+          }
+        }
+      }
+      if (_next2 < 0) {
+        if (_it2.hasNext()) {
+          _next2 = _it2.next();
+        } else { //it2 is completely consumed
+          if (_next1 >= 0) { // consume the last cached value
+            return consume1();
+          } else { // after that, return all values from it1
+            return _it1.next();
+          }
+        }
+      }
+      assert _next1 >= 0 && _next2 >= 0;
+      if (_next1 <= _next2) {
+        return consume1();
+      } else {
+        return consume2();
+      }
+    }
+
+    private int consume1() {
+      int nextVal = _next1;
+      _next1 = -1;
+      return nextVal;
+    }
+
+    private int consume2() {
+      int nextVal = _next2;
+      _next2 = -1;
+      return nextVal;
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java
new file mode 100644
index 0000000000..541b007d47
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class FirstWithTimeAggregationFunctionTest extends 
AbstractAggregationFunctionTest {
+
+  @DataProvider(name = "scenarios")
+  Object[] scenarios() {
+    return new Object[] {
+        new Scenario(FieldSpec.DataType.INT, "1", "2", "-2147483648"),
+        new Scenario(FieldSpec.DataType.LONG, "1", "2", 
"-9223372036854775808"),
+        new Scenario(FieldSpec.DataType.FLOAT, "1", "2", "-Infinity"),
+        new Scenario(FieldSpec.DataType.DOUBLE, "1", "2", "-Infinity"),
+        new Scenario(FieldSpec.DataType.STRING, "a", "b", "\"null\""),
+    };
+  }
+
+  public class Scenario {
+    private final PinotDataType _pinotDataType;
+    private final FieldSpec.DataType _dataType;
+    private final String _valAsStr1;
+    private final String _valAsStr2;
+    private final String _defaultNullValue;
+
+    public Scenario(FieldSpec.DataType dataType, String valAsStr1, String 
valAsStr2, String defaultNullValue) {
+      _dataType = dataType;
+      _valAsStr1 = valAsStr1;
+      _valAsStr2 = valAsStr2;
+      _defaultNullValue = defaultNullValue;
+      _pinotDataType =
+          _dataType == FieldSpec.DataType.INT ? PinotDataType.INTEGER : 
PinotDataType.valueOf(_dataType.name());
+    }
+
+    public FluentQueryTest.DeclaringTable getDeclaringTable(boolean 
nullHandlingEnabled) {
+      Schema schema = new Schema.SchemaBuilder()
+          .setSchemaName("testTable")
+          .setEnableColumnBasedNullHandling(true)
+          .addDimensionField("myField", _dataType, f -> f.setNullable(true))
+          .addDimensionField("timeField", FieldSpec.DataType.TIMESTAMP)
+          .build();
+      TableConfigBuilder tableConfigBuilder = new 
TableConfigBuilder(TableType.OFFLINE)
+          .setTableName("testTable");
+
+      return FluentQueryTest.withBaseDir(_baseDir)
+          .withNullHandling(nullHandlingEnabled)
+          .givenTable(schema, tableConfigBuilder.build());
+    }
+
+    @Override
+    public String toString() {
+      return "Scenario{" + "dt=" + _dataType + ", val1='" + _valAsStr1 + '\'' 
+ ", val2='"
+          + _valAsStr2 + '\'' + '}';
+    }
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrWithoutNull(Scenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField | timeField",
+            "null                   | 1",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            "null                   | 4",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        )
+        .whenQuery("select FIRST_WITH_TIME(myField, timeField, '" + 
scenario._dataType + "') from testTable")
+        .thenResultIs(scenario._pinotDataType.name(), 
scenario._defaultNullValue);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrWithNull(Scenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField | timeField",
+            "null                   | 1",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            "null                   | 4",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        )
+        .whenQuery("select FIRST_WITH_TIME(myField, timeField, '" + 
scenario._dataType + "') from testTable")
+        .thenResultIs(scenario._pinotDataType.name(), scenario._valAsStr1);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrSvWithoutNull(Scenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField | timeField",
+            "null                   | 1",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            "null                   | 4",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        ).whenQuery("select 'cte', FIRST_WITH_TIME(myField, timeField, '" + 
scenario._dataType + "') as mode "
+            + "from testTable "
+            + "group by 'cte'")
+        .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " + 
scenario._defaultNullValue);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrSvWithNull(Scenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField | timeField",
+            "null                   | 1",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            "null                   | 4",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        ).whenQuery("select 'cte', FIRST_WITH_TIME(myField, timeField, '" + 
scenario._dataType + "') as mode "
+            + "from testTable "
+            + "group by 'cte'")
+        .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " + 
scenario._valAsStr1);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrMvWithoutNull(Scenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField | timeField",
+            "null                   | 1",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            "null                   | 4",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, "
+            + "FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType + 
"') as mode "
+            + "from testTable "
+            + "group by 'cte'")
+        .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(),
+            "cte1 | cte2 | " + scenario._defaultNullValue);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrMvWithNull(Scenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField | timeField",
+            "null                   | 1",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            "null                   | 4",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, "
+            + "FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType + 
"') as mode "
+            + "from testTable "
+            + "group by 'cte'")
+        .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(),
+            "cte1 | cte2 | " + scenario._valAsStr1);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java
new file mode 100644
index 0000000000..9c612ae538
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class LastWithTimeAggregationFunctionTest extends 
AbstractAggregationFunctionTest {
+
+  @DataProvider(name = "scenarios")
+  Object[] scenarios() {
+    return new Object[] {
+        new Scenario(FieldSpec.DataType.INT, "1", "2", "-2147483648"),
+        new Scenario(FieldSpec.DataType.LONG, "1", "2", 
"-9223372036854775808"),
+        new Scenario(FieldSpec.DataType.FLOAT, "1", "2", "-Infinity"),
+        new Scenario(FieldSpec.DataType.DOUBLE, "1", "2", "-Infinity"),
+        new Scenario(FieldSpec.DataType.STRING, "a", "b", "\"null\""),
+    };
+  }
+
+  public class Scenario {
+    private final PinotDataType _pinotDataType;
+    private final FieldSpec.DataType _dataType;
+    private final String _valAsStr1;
+    private final String _valAsStr2;
+    private final String _defaultNullValue;
+
+    public Scenario(FieldSpec.DataType dataType, String valAsStr1, String 
valAsStr2, String defaultNullValue) {
+      _dataType = dataType;
+      _valAsStr1 = valAsStr1;
+      _valAsStr2 = valAsStr2;
+      _defaultNullValue = defaultNullValue;
+      _pinotDataType =
+          _dataType == FieldSpec.DataType.INT ? PinotDataType.INTEGER : 
PinotDataType.valueOf(_dataType.name());
+    }
+
+    public FluentQueryTest.DeclaringTable getDeclaringTable(boolean 
nullHandlingEnabled) {
+      Schema schema = new Schema.SchemaBuilder()
+          .setSchemaName("testTable")
+          .setEnableColumnBasedNullHandling(true)
+          .addDimensionField("myField", _dataType, f -> f.setNullable(true))
+          .addDimensionField("timeField", FieldSpec.DataType.TIMESTAMP)
+          .build();
+      TableConfigBuilder tableConfigBuilder = new 
TableConfigBuilder(TableType.OFFLINE)
+          .setTableName("testTable");
+
+      return FluentQueryTest.withBaseDir(_baseDir)
+          .withNullHandling(nullHandlingEnabled)
+          .givenTable(schema, tableConfigBuilder.build());
+    }
+
+    @Override
+    public String toString() {
+      return "Scenario{" + "dt=" + _dataType + ", val1='" + _valAsStr1 + '\'' 
+ ", val2='"
+          + _valAsStr2 + '\'' + '}';
+    }
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrWithoutNull(Scenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField | timeField",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        )
+        .whenQuery("select LAST_WITH_TIME(myField, timeField, '" + 
scenario._dataType + "') from testTable")
+        .thenResultIs(scenario._pinotDataType.name(), 
scenario._defaultNullValue);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrWithNull(Scenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField | timeField",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        )
+        .whenQuery("select LAST_WITH_TIME(myField, timeField, '" + 
scenario._dataType + "') from testTable")
+        .thenResultIs(scenario._pinotDataType.name(), scenario._valAsStr2);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrSvWithoutNull(Scenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField | timeField",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        ).whenQuery("select 'cte', LAST_WITH_TIME(myField, timeField, '" + 
scenario._dataType + "') as mode "
+            + "from testTable "
+            + "group by 'cte'")
+        .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " + 
scenario._defaultNullValue);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrSvWithNull(Scenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField | timeField",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        ).whenQuery("select 'cte', LAST_WITH_TIME(myField, timeField, '" + 
scenario._dataType + "') as mode "
+            + "from testTable "
+            + "group by 'cte'")
+        .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " + 
scenario._valAsStr2);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrMvWithoutNull(Scenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField | timeField",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, "
+            + "LAST_WITH_TIME(myField, timeField, '" + scenario._dataType + 
"') as mode "
+            + "from testTable "
+            + "group by 'cte'")
+        .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(),
+            "cte1 | cte2 | " + scenario._defaultNullValue);
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrMvWithNull(Scenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField | timeField",
+            scenario._valAsStr1 + " | 2",
+            "null                   | 3"
+        ).andOnSecondInstance("myField | timeField",
+            scenario._valAsStr2 + " | 5",
+            "null                   | 6"
+        ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, "
+            + "LAST_WITH_TIME(myField, timeField, '" + scenario._dataType + 
"') as mode "
+            + "from testTable "
+            + "group by 'cte'")
+        .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(),
+            "cte1 | cte2 | " + scenario._valAsStr2);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
index 81e2ded523..d6c3cf82fb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
@@ -35,6 +35,14 @@ public abstract class ValueLongPair<V extends Comparable<V>> 
implements Comparab
     return _time;
   }
 
+  public void setValue(V value) {
+    _value = value;
+  }
+
+  public void setTime(long time) {
+    _time = time;
+  }
+
   abstract public byte[] toBytes();
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to