This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b3172464ce add FIRSTWITHTIME aggregate function support #7647 (#8181)
b3172464ce is described below

commit b3172464cedff45a1af2be59db0822a6aeb6f403
Author: Yash Agarwal <yash.0...@gmail.com>
AuthorDate: Wed May 4 23:38:15 2022 +0530

    add FIRSTWITHTIME aggregate function support #7647 (#8181)
---
 .../function/AggregationFunctionTypeTest.java      |   2 +
 .../function/AggregationFunctionFactory.java       |  30 ++
 ...irstDoubleValueWithTimeAggregationFunction.java | 126 ++++++
 ...FirstFloatValueWithTimeAggregationFunction.java | 127 ++++++
 .../FirstIntValueWithTimeAggregationFunction.java  | 142 +++++++
 .../FirstLongValueWithTimeAggregationFunction.java | 126 ++++++
 ...irstStringValueWithTimeAggregationFunction.java | 124 ++++++
 .../function/FirstWithTimeAggregationFunction.java | 222 +++++++++++
 .../function/AggregationFunctionFactoryTest.java   |  42 ++
 .../pinot/queries/FirstWithTimeQueriesTest.java    | 434 +++++++++++++++++++++
 .../pinot/segment/spi/AggregationFunctionType.java |   1 +
 11 files changed, 1376 insertions(+)

diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index ea9f69a815..f7a9ca16b0 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -33,6 +33,8 @@ public class AggregationFunctionTypeTest {
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("SuM"), 
AggregationFunctionType.SUM);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("AvG"), 
AggregationFunctionType.AVG);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MoDe"), 
AggregationFunctionType.MODE);
+    
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("FiRsTwItHtImE"),
+            AggregationFunctionType.FIRSTWITHTIME);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("LaStWiThTiMe"),
             AggregationFunctionType.LASTWITHTIME);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmAxRaNgE"),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 74c815edf5..fc684b434d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -160,6 +160,36 @@ public class AggregationFunctionFactory {
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case FIRSTWITHTIME:
+            if (arguments.size() == 3) {
+              ExpressionContext timeCol = arguments.get(1);
+              ExpressionContext dataType = arguments.get(2);
+              if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+                throw new IllegalArgumentException("Third argument of 
firstWithTime Function should be literal."
+                    + " The function can be used as firstWithTime(dataColumn, 
timeColumn, 'dataType')");
+              }
+              FieldSpec.DataType fieldDataType
+                  = 
FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+              switch (fieldDataType) {
+                case BOOLEAN:
+                case INT:
+                  return new FirstIntValueWithTimeAggregationFunction(
+                      firstArgument, timeCol, fieldDataType == 
FieldSpec.DataType.BOOLEAN);
+                case LONG:
+                  return new 
FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol);
+                case FLOAT:
+                  return new 
FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
+                case DOUBLE:
+                  return new 
FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
+                case STRING:
+                  return new 
FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol);
+                default:
+                  throw new IllegalArgumentException("Unsupported Value Type 
for firstWithTime Function:" + dataType);
+              }
+            } else {
+              throw new IllegalArgumentException("Three arguments are required 
for firstWithTime Function."
+                  + " The function can be used as firstWithTime(dataColumn, 
timeColumn, 'dataType')");
+            }
           case LASTWITHTIME:
             if (arguments.size() == 3) {
               ExpressionContext timeCol = arguments.get(1);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..56aff397df
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with 
double type.
+ * <p>The function can be used as FirstWithTime(dataExpression, 
timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be 
calculated first on</li>
+ *   <li>timeExpression: expression that contains the column to be used to 
decide which data is first, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstDoubleValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<Double> {
+
+  private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+      = new DoubleLongPair(Double.NaN, Long.MAX_VALUE);
+
+  public FirstDoubleValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Double> constructValueLongPair(Double value, long time) 
{
+    return new DoubleLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+    Double firstData = defaultValueLongPair.getValue();
+    long firstTime = defaultValueLongPair.getTime();
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time <= firstTime) {
+        firstTime = time;
+        firstData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double value = doubleValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + 
_timeCol + ",'DOUBLE')";
+  }
+
+  @Override
+  public String getColumnName() {
+    return getType().getName() + "_" + _expression + "_" + _timeCol + 
"_DOUBLE";
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.DOUBLE;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..3801ff17a3
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with 
float type.
+ * <p>The function can be used as FirstWithTime(dataExpression, 
timeExpression, 'float')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the float data column to be 
calculated first on</li>
+ *   <li>timeExpression: expression that contains the column to be used to 
decide which data is first, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstFloatValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<Float> {
+
+  private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new 
FloatLongPair(Float.NaN, Long.MAX_VALUE);
+
+  public FirstFloatValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Float> constructValueLongPair(Float value, long time) {
+    return new FloatLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Float> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
+    Float firstData = defaultValueLongPair.getValue();
+    long firstTime = defaultValueLongPair.getTime();
+    float[] floatValues = blockValSet.getFloatValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      float data = floatValues[i];
+      long time = timeValues[i];
+      if (time <= firstTime) {
+        firstTime = time;
+        firstData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length,
+      int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    float[] floatValues = blockValSet.getFloatValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      float data = floatValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    float[] floatValues = blockValSet.getFloatValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      float value = floatValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + 
_timeCol + ",'FLOAT')";
+  }
+
+  @Override
+  public String getColumnName() {
+    return getType().getName() + "_" + _expression + "_" + _timeCol + "_FLOAT";
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.FLOAT;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..be151b9994
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with 
int/boolean type.
+ * <p>The function can be used as FirstWithTime(dataExpression, 
timeExpression, 'int')
+ * or FirstWithTime(dataExpression, timeExpression, 'boolean')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the int/boolean data column 
to be calculated first on</li>
+ *   <li>timeExpression: expression that contains the column to be used to 
decide which data is first, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstIntValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<Integer> {
+
+  private final static ValueLongPair<Integer> DEFAULT_VALUE_TIME_PAIR
+      = new IntLongPair(Integer.MIN_VALUE, Long.MAX_VALUE);
+  private final boolean _isBoolean;
+
+  public FirstIntValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol,
+      boolean isBoolean) {
+    super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
+    _isBoolean = isBoolean;
+  }
+
+  @Override
+  public ValueLongPair<Integer> constructValueLongPair(Integer value, long 
time) {
+    return new IntLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Integer> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
+    Integer firstData = defaultValueLongPair.getValue();
+    long firstTime = defaultValueLongPair.getTime();
+    int[] intValues = blockValSet.getIntValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      int data = intValues[i];
+      long time = timeValues[i];
+      if (time <= firstTime) {
+        firstTime = time;
+        firstData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    int[] intValues = blockValSet.getIntValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      int data = intValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    int[] intValues = blockValSet.getIntValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      int value = intValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    if (_isBoolean) {
+      return getType().getName().toLowerCase() + "(" + _expression + "," + 
_timeCol + ",'BOOLEAN')";
+    } else {
+      return getType().getName().toLowerCase() + "(" + _expression + "," + 
_timeCol + ",'INT')";
+    }
+  }
+
+  @Override
+  public String getColumnName() {
+    if (_isBoolean) {
+      return getType().getName() + "_" + _expression + "_" + _timeCol + 
"_BOOLEAN";
+    } else {
+      return getType().getName() + "_" + _expression + "_" + _timeCol + "_INT";
+    }
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    if (_isBoolean) {
+      return ColumnDataType.BOOLEAN;
+    } else {
+      return ColumnDataType.INT;
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..960cda8820
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with 
long type.
+ * <p>The function can be used as FirstWithTime(dataExpression, 
timeExpression, 'long')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the long data column to be 
calculated first on</li>
+ *   <li>timeExpression: expression that contains the column to be used to 
decide which data is first, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstLongValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<Long> {
+
+  private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR
+      = new LongLongPair(Long.MIN_VALUE, Long.MAX_VALUE);
+
+  public FirstLongValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Long> constructValueLongPair(Long value, long time) {
+    return new LongLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Long> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair();
+    Long firstData = defaultValueLongPair.getValue();
+    long firstTime = defaultValueLongPair.getTime();
+    long[] longValues = blockValSet.getLongValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      long data = longValues[i];
+      long time = timeValues[i];
+      if (time <= firstTime) {
+        firstTime = time;
+        firstData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    long[] longValues = blockValSet.getLongValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      long data = longValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    long[] longValues = blockValSet.getLongValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      long value = longValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + 
_timeCol + ",'LONG')";
+  }
+
+  @Override
+  public String getColumnName() {
+    return getType().getName() + "_" + _expression + "_" + _timeCol + "_LONG";
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..b0ace7487e
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with 
string type.
+ * <p>The function can be used as FirstWithTime(dataExpression, 
timeExpression, 'string')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the string data column to be 
calculated first on</li>
+ *   <li>timeExpression: expression that contains the column to be used to 
decide which data is first, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstStringValueWithTimeAggregationFunction extends 
FirstWithTimeAggregationFunction<String> {
+  private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new 
StringLongPair("", Long.MAX_VALUE);
+
+  public FirstStringValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<String> constructValueLongPair(String value, long time) 
{
+    return new StringLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<String> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair();
+    String firstData = defaultValueLongPair.getValue();
+    long firstTime = defaultValueLongPair.getTime();
+    String[] stringValues = blockValSet.getStringValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      String data = stringValues[i];
+      long time = timeValues[i];
+      if (time <= firstTime) {
+        firstTime = time;
+        firstData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, firstData, firstTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    String[] stringValues = blockValSet.getStringValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      String data = stringValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    String[] stringValues = blockValSet.getStringValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      String value = stringValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + 
_timeCol + ",'STRING')";
+  }
+
+  @Override
+  public String getColumnName() {
+    return getType().getName() + "_" + _expression + "_" + _timeCol + 
"_STRING";
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..7049d0f1db
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * This function is used for FirstWithTime calculations.
+ * <p>The function can be used as FirstWithTime(dataExpression, 
timeExpression, 'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the column to be calculated 
first on</li>
+ *   <li>timeExpression: expression that contains the column to be used to 
decide which data is first, can be any
+ *   Numeric column</li>
+ *   <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>>
+    extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+  protected final ExpressionContext _timeCol;
+  private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> 
_objectSerDe;
+
+  public FirstWithTimeAggregationFunction(ExpressionContext dataCol,
+      ExpressionContext timeCol,
+      ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
+    super(dataCol);
+    _timeCol = timeCol;
+    _objectSerDe = objectSerDe;
+  }
+
+  public abstract ValueLongPair<V> constructValueLongPair(V value, long time);
+
+  public abstract ValueLongPair<V> getDefaultValueTimePair();
+
+  public abstract void aggregateResultWithRawData(int length, 
AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet);
+
+  public abstract void aggregateGroupResultWithRawDataSv(int length,
+      int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet);
+
+  public abstract void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet);
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FIRSTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, 
blockTimeSet);
+    } else {
+      ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
+      V firstData = defaultValueLongPair.getValue();
+      long firstTime = defaultValueLongPair.getTime();
+      // Serialized FirstPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
+        V data = firstWithTimePair.getValue();
+        long time = firstWithTimePair.getTime();
+        if (time <= firstTime) {
+          firstTime = time;
+          firstData = data;
+        }
+      }
+      setAggregationResult(aggregationResultHolder, firstData, firstTime);
+    }
+  }
+
+  protected void setAggregationResult(AggregationResultHolder 
aggregationResultHolder, V data, long time) {
+    ValueLongPair firstWithTimePair = aggregationResultHolder.getResult();
+    if (firstWithTimePair == null || time <= firstWithTimePair.getTime()) {
+      aggregationResultHolder.setValue(constructValueLongPair(data, time));
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateGroupResultWithRawDataSv(length, groupKeyArray, 
groupByResultHolder,
+          blockValSet, timeValSet);
+    } else {
+      // Serialized FirstPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
+        setGroupByResult(groupKeyArray[i],
+            groupByResultHolder,
+            firstWithTimePair.getValue(),
+            firstWithTimePair.getTime());
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateGroupResultWithRawDataMv(length, groupKeysArray, 
groupByResultHolder, blockValSet, timeValSet);
+    } else {
+      // Serialized ValueTimePair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> firstWithTimePair = 
_objectSerDe.deserialize(bytesValues[i]);
+        V data = firstWithTimePair.getValue();
+        long time = firstWithTimePair.getTime();
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, data, time);
+        }
+      }
+    }
+  }
+
+  protected void setGroupByResult(int groupKey, GroupByResultHolder 
groupByResultHolder, V data, long time) {
+    ValueLongPair firstWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (firstWithTimePair == null || time <= firstWithTimePair.getTime()) {
+      groupByResultHolder.setValueForKey(groupKey, 
constructValueLongPair(data, time));
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    ValueLongPair firstWithTimePair = aggregationResultHolder.getResult();
+    if (firstWithTimePair == null) {
+      return getDefaultValueTimePair();
+    } else {
+      return firstWithTimePair;
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    ValueLongPair<V> firstWithTimePair = 
groupByResultHolder.getResult(groupKey);
+    if (firstWithTimePair == null) {
+      return getDefaultValueTimePair();
+    } else {
+      return firstWithTimePair;
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1, 
ValueLongPair<V> intermediateResult2) {
+    if (intermediateResult1.getTime() <= intermediateResult2.getTime()) {
+      return intermediateResult1;
+    } else {
+      return intermediateResult2;
+    }
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public V extractFinalResult(ValueLongPair<V> intermediateResult) {
+    return intermediateResult.getValue();
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index d51d4704f6..58deac76ee 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -87,6 +87,48 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getColumnName(), "mode_column");
     assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
 
+    function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'BOOLEAN')");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
FirstIntValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.FIRSTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), 
"firstWithTime_column_timeColumn_BOOLEAN");
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
+    function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'INT')");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
FirstIntValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.FIRSTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), 
"firstWithTime_column_timeColumn_INT");
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
+    function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'LONG')");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
FirstLongValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.FIRSTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), 
"firstWithTime_column_timeColumn_LONG");
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
+    function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'FLOAT')");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
FirstFloatValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.FIRSTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), 
"firstWithTime_column_timeColumn_FLOAT");
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
+    function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'DOUBLE')");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
FirstDoubleValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.FIRSTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), 
"firstWithTime_column_timeColumn_DOUBLE");
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
+    function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'STRING')");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
FirstStringValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.FIRSTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), 
"firstWithTime_column_timeColumn_STRING");
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
     function = getFunction("LaStWiThTiMe", "(column,timeColumn,'BOOLEAN')");
     aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
     assertTrue(aggregationFunction instanceof 
LastIntValueWithTimeAggregationFunction);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java
new file mode 100644
index 0000000000..e7be59a2c8
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Queries test for FIRSTWITHTIME queries.
+ */
+public class FirstWithTimeQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"FirstQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final Random RANDOM = new Random();
+
+  private static final int NUM_RECORDS = 2000;
+  private static final int MAX_VALUE = 1000;
+
+  private static final String BOOL_COLUMN = "boolColumn";
+  private static final String BOOL_NO_DICT_COLUMN = "boolNoDictColumn";
+  private static final String INT_COLUMN = "intColumn";
+  private static final String INT_MV_COLUMN = "intMVColumn";
+  private static final String INT_NO_DICT_COLUMN = "intNoDictColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String LONG_NO_DICT_COLUMN = "longNoDictColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String FLOAT_NO_DICT_COLUMN = "floatNoDictColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String DOUBLE_NO_DICT_COLUMN = "doubleNoDictColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String STRING_NO_DICT_COLUMN = "stringNoDictColumn";
+  private static final String TIME_COLUMN = "timestampColumn";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(BOOL_COLUMN, DataType.BOOLEAN)
+      .addSingleValueDimension(BOOL_NO_DICT_COLUMN, DataType.BOOLEAN)
+      .addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(INT_MV_COLUMN, DataType.INT)
+      .addSingleValueDimension(INT_NO_DICT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, DataType.LONG)
+      .addSingleValueDimension(LONG_NO_DICT_COLUMN, DataType.LONG)
+      .addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(FLOAT_NO_DICT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE)
+      .addSingleValueDimension(DOUBLE_NO_DICT_COLUMN, DataType.DOUBLE)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(STRING_NO_DICT_COLUMN, DataType.STRING)
+      .addSingleValueDimension(TIME_COLUMN, DataType.LONG).build();
+  private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .setNoDictionaryColumns(
+          Lists.newArrayList(INT_NO_DICT_COLUMN, LONG_NO_DICT_COLUMN, 
FLOAT_NO_DICT_COLUMN, DOUBLE_NO_DICT_COLUMN))
+      .build();
+
+  private Boolean _expectedResultFirstBoolean;
+  private Integer _expectedResultFirstInt;
+  private Long _expectedResultFirstLong;
+  private Float _expectedResultFirstFloat;
+  private Double _expectedResultFirstDouble;
+  private String _expectedResultFirstString;
+  private Map<Integer, Boolean> _boolGroupValues;
+  private Map<Integer, Integer> _intGroupValues;
+  private Map<Integer, Long> _longGroupValues;
+  private Map<Integer, Float> _floatGroupValues;
+  private Map<Integer, Double> _doubleGroupValues;
+  private Map<Integer, String> _stringGroupValues;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    _boolGroupValues = new HashMap<>();
+    _intGroupValues = new HashMap<>();
+    _longGroupValues = new HashMap<>();
+    _floatGroupValues = new HashMap<>();
+    _doubleGroupValues = new HashMap<>();
+    _stringGroupValues = new HashMap<>();
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      boolean boolValue = RANDOM.nextBoolean();
+      int intValue = RANDOM.nextInt(MAX_VALUE);
+      long longValue = RANDOM.nextLong();
+      float floatValue = RANDOM.nextFloat();
+      double doubleValue = RANDOM.nextDouble();
+      String strValue = String.valueOf(RANDOM.nextDouble());
+      GenericRow record = new GenericRow();
+      record.putValue(BOOL_COLUMN, boolValue);
+      record.putValue(BOOL_NO_DICT_COLUMN, boolValue);
+      record.putValue(INT_COLUMN, intValue);
+      record.putValue(INT_MV_COLUMN, new Integer[]{intValue, intValue});
+      record.putValue(INT_NO_DICT_COLUMN, intValue);
+      record.putValue(LONG_COLUMN, longValue);
+      record.putValue(LONG_NO_DICT_COLUMN, longValue);
+      record.putValue(FLOAT_COLUMN, floatValue);
+      record.putValue(FLOAT_NO_DICT_COLUMN, floatValue);
+      record.putValue(DOUBLE_COLUMN, doubleValue);
+      record.putValue(DOUBLE_NO_DICT_COLUMN, doubleValue);
+      record.putValue(STRING_COLUMN, strValue);
+      record.putValue(STRING_NO_DICT_COLUMN, strValue);
+      record.putValue(TIME_COLUMN, (long) i);
+      if (i == 0) {
+        _expectedResultFirstBoolean = boolValue;
+        _expectedResultFirstInt = intValue;
+        _expectedResultFirstLong = longValue;
+        _expectedResultFirstFloat = floatValue;
+        _expectedResultFirstDouble = doubleValue;
+        _expectedResultFirstString = strValue;
+      }
+      _boolGroupValues.putIfAbsent(intValue, boolValue);
+      _intGroupValues.putIfAbsent(intValue, intValue);
+      _longGroupValues.putIfAbsent(intValue, longValue);
+      _floatGroupValues.putIfAbsent(intValue, floatValue);
+      _doubleGroupValues.putIfAbsent(intValue, doubleValue);
+      _stringGroupValues.putIfAbsent(intValue, strValue);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @Test
+  public void testAggregationOnly() {
+    String query = "SELECT "
+        + "FIRSTWITHTIME(boolColumn, timestampColumn, 'BOOLEAN'), "
+        + "FIRSTWITHTIME(intColumn, timestampColumn, 'INT'), "
+        + "FIRSTWITHTIME(longColumn, timestampColumn, 'LONG'), "
+        + "FIRSTWITHTIME(floatColumn, timestampColumn, 'FLOAT'), "
+        + "FIRSTWITHTIME(doubleColumn, timestampColumn, 'DOUBLE'), "
+        + "FIRSTWITHTIME(stringColumn, timestampColumn, 'STRING') "
+        + "FROM testTable";
+
+    // Inner segment
+    AggregationOperator aggregationOperator = getOperator(query);
+    IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        7 * NUM_RECORDS, NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getAggregationResult();
+    assertNotNull(aggregationResult);
+    assertEquals(((IntLongPair) aggregationResult.get(0)).getValue() != 0, 
_expectedResultFirstBoolean.booleanValue());
+    assertEquals(((IntLongPair) aggregationResult.get(1)).getValue(), 
_expectedResultFirstInt);
+    assertEquals(((LongLongPair) aggregationResult.get(2)).getValue(), 
_expectedResultFirstLong);
+    assertEquals(((FloatLongPair) aggregationResult.get(3)).getValue(), 
_expectedResultFirstFloat);
+    assertEquals(((DoubleLongPair) aggregationResult.get(4)).getValue(), 
_expectedResultFirstDouble);
+    assertEquals(((StringLongPair) aggregationResult.get(5)).getValue(), 
_expectedResultFirstString);
+
+    // Inter segments (expect 4 * inner segment result)
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema = new DataSchema(new String[]{
+        "firstwithtime(boolColumn,timestampColumn,'BOOLEAN')",
+        "firstwithtime(intColumn,timestampColumn,'INT')",
+        "firstwithtime(longColumn,timestampColumn,'LONG')",
+        "firstwithtime(floatColumn,timestampColumn,'FLOAT')",
+        "firstwithtime(doubleColumn,timestampColumn,'DOUBLE')",
+        "firstwithtime(stringColumn,timestampColumn,'STRING')"
+    }, new ColumnDataType[]{
+        ColumnDataType.BOOLEAN,
+        ColumnDataType.INT,
+        ColumnDataType.LONG,
+        ColumnDataType.FLOAT,
+        ColumnDataType.DOUBLE,
+        ColumnDataType.STRING
+    });
+    Object[] expectedResults = new Object[]{
+        _expectedResultFirstBoolean,
+        _expectedResultFirstInt,
+        _expectedResultFirstLong,
+        _expectedResultFirstFloat,
+        _expectedResultFirstDouble,
+        _expectedResultFirstString
+    };
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * NUM_RECORDS, 
0L, 4 * 7 * NUM_RECORDS,
+        4 * NUM_RECORDS, new ResultTable(expectedDataSchema, 
Collections.singletonList(expectedResults)));
+  }
+
+  @Test
+  public void testAggregationOnlyNoDictionary() {
+    String query = "SELECT "
+        + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean'), "
+        + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int'), "
+        + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long'), "
+        + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float'), "
+        + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double'), "
+        + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') "
+        + "FROM testTable";
+
+    // Inner segment
+    AggregationOperator aggregationOperator = getOperator(query);
+    IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        7 * NUM_RECORDS, NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getAggregationResult();
+    assertNotNull(aggregationResult);
+    assertEquals(((IntLongPair) aggregationResult.get(0)).getValue() != 0, 
_expectedResultFirstBoolean.booleanValue());
+    assertEquals(((IntLongPair) aggregationResult.get(1)).getValue(), 
_expectedResultFirstInt);
+    assertEquals(((LongLongPair) aggregationResult.get(2)).getValue(), 
_expectedResultFirstLong);
+    assertEquals(((FloatLongPair) aggregationResult.get(3)).getValue(), 
_expectedResultFirstFloat);
+    assertEquals(((DoubleLongPair) aggregationResult.get(4)).getValue(), 
_expectedResultFirstDouble);
+    assertEquals(((StringLongPair) aggregationResult.get(5)).getValue(), 
_expectedResultFirstString);
+
+    // Inter segments (expect 4 * inner segment result)
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema = new DataSchema(new String[]{
+        "firstwithtime(boolNoDictColumn,timestampColumn,'BOOLEAN')",
+        "firstwithtime(intNoDictColumn,timestampColumn,'INT')",
+        "firstwithtime(longNoDictColumn,timestampColumn,'LONG')",
+        "firstwithtime(floatNoDictColumn,timestampColumn,'FLOAT')",
+        "firstwithtime(doubleNoDictColumn,timestampColumn,'DOUBLE')",
+        "firstwithtime(stringNoDictColumn,timestampColumn,'STRING')"
+    }, new ColumnDataType[]{
+        ColumnDataType.BOOLEAN,
+        ColumnDataType.INT,
+        ColumnDataType.LONG,
+        ColumnDataType.FLOAT,
+        ColumnDataType.DOUBLE,
+        ColumnDataType.STRING
+    });
+    Object[] expectedResults = new Object[]{
+        _expectedResultFirstBoolean,
+        _expectedResultFirstInt,
+        _expectedResultFirstLong,
+        _expectedResultFirstFloat,
+        _expectedResultFirstDouble,
+        _expectedResultFirstString
+    };
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * NUM_RECORDS, 
0L, 4 * 7 * NUM_RECORDS,
+        4 * NUM_RECORDS, new ResultTable(expectedDataSchema, 
Collections.singletonList(expectedResults)));
+  }
+
+  @Test
+  public void testAggregationGroupBySV() {
+    String query = "SELECT intColumn AS key, "
+        + "FIRSTWITHTIME(boolColumn,timestampColumn,'boolean') AS v1, "
+        + "FIRSTWITHTIME(intColumn,timestampColumn,'int') AS v2, "
+        + "FIRSTWITHTIME(longColumn,timestampColumn,'long') AS v3, "
+        + "FIRSTWITHTIME(floatColumn,timestampColumn,'float') AS v4, "
+        + "FIRSTWITHTIME(doubleColumn,timestampColumn,'double') AS v5, "
+        + "FIRSTWITHTIME(stringColumn,timestampColumn,'string') AS v6 "
+        + "FROM testTable GROUP BY key";
+    verifyAggregationGroupBy(query, 7);
+  }
+
+  @Test
+  public void testAggregationGroupBySVNoDictionary() {
+    String query = "SELECT intNoDictColumn AS key, "
+        + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean') AS v1, "
+        + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int') AS v2, "
+        + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long') AS v3, "
+        + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float') AS v4, "
+        + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double') AS v5, "
+        + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') AS v6 "
+        + "FROM testTable GROUP BY key";
+    verifyAggregationGroupBy(query, 7);
+  }
+
+  @Test
+  public void testAggregationGroupByMV() {
+    String query = "SELECT intMVColumn AS key, "
+        + "FIRSTWITHTIME(boolColumn,timestampColumn,'boolean') AS v1, "
+        + "FIRSTWITHTIME(intColumn,timestampColumn,'int') AS v2, "
+        + "FIRSTWITHTIME(longColumn,timestampColumn,'long') AS v3, "
+        + "FIRSTWITHTIME(floatColumn,timestampColumn,'float') AS v4, "
+        + "FIRSTWITHTIME(doubleColumn,timestampColumn,'double') AS v5, "
+        + "FIRSTWITHTIME(stringColumn,timestampColumn,'string') AS v6 "
+        + "FROM testTable GROUP BY key";
+    verifyAggregationGroupBy(query, 8);
+  }
+
+  @Test
+  public void testAggregationGroupByMVNoDictionary() {
+    String query = "SELECT intMVColumn AS key, "
+        + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean') AS v1, "
+        + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int') AS v2, "
+        + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long') AS v3, "
+        + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float') AS v4, "
+        + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double') AS v5, "
+        + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') AS v6 "
+        + "FROM testTable GROUP BY key";
+    verifyAggregationGroupBy(query, 8);
+  }
+
+  private void verifyAggregationGroupBy(String query, int numProjectedColumns) 
{
+    // Inner segment
+    AggregationGroupByOrderByOperator groupByOperator = getOperator(query);
+    IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        numProjectedColumns * (long) NUM_RECORDS, NUM_RECORDS);
+    AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+    int numGroups = 0;
+    Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = 
aggregationGroupByResult.getGroupKeyIterator();
+    while (groupKeyIterator.hasNext()) {
+      numGroups++;
+      GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+      Integer key = (Integer) groupKey._keys[0];
+      assertTrue(_intGroupValues.containsKey(key));
+      assertEquals(((IntLongPair) 
aggregationGroupByResult.getResultForGroupId(0, groupKey._groupId)).getValue() 
!= 0,
+          (boolean) _boolGroupValues.get(key));
+      assertEquals(((IntLongPair) 
aggregationGroupByResult.getResultForGroupId(1, groupKey._groupId)).getValue(),
+          _intGroupValues.get(key));
+      assertEquals(((LongLongPair) 
aggregationGroupByResult.getResultForGroupId(2, groupKey._groupId)).getValue(),
+          _longGroupValues.get(key));
+      assertEquals(((FloatLongPair) 
aggregationGroupByResult.getResultForGroupId(3, groupKey._groupId)).getValue(),
+          _floatGroupValues.get(key));
+      assertEquals(((DoubleLongPair) 
aggregationGroupByResult.getResultForGroupId(4, groupKey._groupId)).getValue(),
+          _doubleGroupValues.get(key));
+      assertEquals(((StringLongPair) 
aggregationGroupByResult.getResultForGroupId(5, groupKey._groupId)).getValue(),
+          _stringGroupValues.get(key));
+    }
+    assertEquals(numGroups, _intGroupValues.size());
+
+    // Inter segments (expect 4 * inner segment result)
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 
numProjectedColumns * (long) NUM_RECORDS);
+    assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    DataSchema expectedDataSchema =
+        new DataSchema(new String[]{"key", "v1", "v2", "v3", "v4", "v5", 
"v6"}, new ColumnDataType[]{
+            ColumnDataType.INT, ColumnDataType.BOOLEAN, ColumnDataType.INT, 
ColumnDataType.LONG,
+            ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING
+        });
+    assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 10);
+    for (Object[] row : rows) {
+      assertEquals(row.length, 7);
+      int key = (Integer) row[0];
+      assertEquals(row[1], _boolGroupValues.get(key));
+      assertEquals(row[2], _intGroupValues.get(key));
+      assertEquals(row[3], _longGroupValues.get(key));
+      assertEquals(row[4], _floatGroupValues.get(key));
+      assertEquals(row[5], _doubleGroupValues.get(key));
+      assertEquals(row[6], _stringGroupValues.get(key));
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 889ade6e42..a1a435600a 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -37,6 +37,7 @@ public enum AggregationFunctionType {
   SUMPRECISION("sumPrecision"),
   AVG("avg"),
   MODE("mode"),
+  FIRSTWITHTIME("firstWithTime"),
   LASTWITHTIME("lastWithTime"),
   MINMAXRANGE("minMaxRange"),
   DISTINCTCOUNT("distinctCount"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to