somandal commented on code in PR #10636:
URL: https://github.com/apache/pinot/pull/10636#discussion_r1173079326


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ChildAggregationFunction.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.IntAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.IntGroupByResultHolder;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+

Review Comment:
   please add javadocs (for this class and all other newly added classes where 
it makes sense). Call out the use of the dummy result holders and why it is 
needed.
   
   Is my understanding correct that these child functions are just a 
placeholder and only really used to map the parent aggregation result to the 
correct column in the output schema and the correct child agg function?



##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -213,6 +215,8 @@ public static ObjectType getObjectType(Object value) {
         return ObjectType.VarianceTuple;
       } else if (value instanceof PinotFourthMoment) {
         return ObjectType.PinotFourthMoment;
+      } else if (value instanceof 
org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject) {

Review Comment:
   nit: don't need to use the fully qualified name here



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a 
server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;

Review Comment:
   Just wondering if it might be cleaner to use an enum here and check the 
enum's value (0 or 1) instead? Feel free to ignore this suggestion
   
   and nit: rename NOT_NULL_OBJECT -> NON_NULL_OBJECT, rename IS_NULL_OBJECT -> 
NULL_OBJECT



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ArgMinMaxTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"HistogramQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final int NUM_RECORDS = 2000;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String MV_INT_COLUMN = "mvIntColumn";
+  private static final String MV_BYTES_COLUMN = "mvBytesColumn";
+  private static final String MV_STRING_COLUMN = "mvStringColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String GROUP_BY_INT_COLUMN = "groupByIntColumn";
+  private static final String GROUP_BY_MV_INT_COLUMN = "groupByMVIntColumn";
+  private static final String GROUP_BY_INT_COLUMN2 = "groupByIntColumn2";
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, 
DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, 
DataType.DOUBLE).addMultiValueDimension(MV_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(MV_BYTES_COLUMN, DataType.BYTES)
+      .addMultiValueDimension(MV_STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(GROUP_BY_MV_INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN2, DataType.INT)
+      .build();
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return " WHERE intColumn >=  500";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    String[] stringSVVals = new String[]{"a2", "a3", "a4", "a5", "a6", "a7", 
"a8", "a9", "a11", "a22"};
+    int j = 1;
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, i);
+      record.putValue(LONG_COLUMN, (long) i - NUM_RECORDS / 2);
+      record.putValue(FLOAT_COLUMN, (float) i * 0.5);
+      record.putValue(DOUBLE_COLUMN, (double) i);
+      record.putValue(MV_INT_COLUMN, Arrays.asList(i, i + 1, i + 2));
+      record.putValue(MV_BYTES_COLUMN, 
Arrays.asList(String.valueOf(i).getBytes(), String.valueOf(i + 1).getBytes(),
+          String.valueOf(i + 2).getBytes()));
+      record.putValue(MV_STRING_COLUMN, Arrays.asList("a" + i, "a" + i + 1, 
"a" + i + 2));
+      if (i < 20) {
+        record.putValue(STRING_COLUMN, stringSVVals[i % stringSVVals.length]);
+      } else {
+        record.putValue(STRING_COLUMN, "a33");
+      }
+      record.putValue(GROUP_BY_INT_COLUMN, i % 5);
+      record.putValue(GROUP_BY_MV_INT_COLUMN, Arrays.asList(i % 10, (i + 1) % 
10));
+      if (i == j) {
+        j *= 2;
+      }
+      record.putValue(GROUP_BY_INT_COLUMN2, j);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+
+    QueryRewriterFactory.init(String.join(",", 
QueryRewriterFactory.DEFAULT_QUERY_REWRITERS_CLASS_NAMES)
+        + ",org.apache.pinot.sql.parsers.rewriter.ArgMinMaxRewriter");
+    ResultRewriterFactory
+        
.init("org.apache.pinot.core.query.utils.rewriter.ParentAggregationResultRewriter");
+  }
+
+  @Test
+  public void testAggregationInterSegment() {
+    // Simple inter segment
+    String query = "SELECT arg_max(intColumn, longColumn) FROM testTable";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+
+    assertEquals(rows.get(0)[0], 999L);
+    assertEquals(rows.get(1)[0], 999L);
+    assertEquals(rows.size(), 2);
+
+    // Inter segment data type test
+    query = "SELECT arg_max(intColumn, longColumn), arg_max(intColumn, 
floatColumn), "
+        + "arg_max(intColumn, doubleColumn), arg_min(intColumn, mvIntColumn), "
+        + "arg_min(intColumn, mvStringColumn), arg_min(intColumn, intColumn) 
FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(resultTable.getDataSchema().getColumnName(0), 
"argmax([intColumn, longColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(1), 
"argmax([intColumn, floatColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(2), 
"argmax([intColumn, doubleColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(3), 
"argmin([intColumn, mvIntColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(4), 
"argmin([intColumn, mvStringColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(5), 
"argmin([intColumn, intColumn])");
+
+    assertEquals(rows.size(), 2);
+    assertEquals(rows.get(0)[0], 999L);
+    assertEquals(rows.get(1)[0], 999L);
+    assertEquals(rows.get(0)[1], 999.5F);
+    assertEquals(rows.get(1)[1], 999.5F);
+    assertEquals(rows.get(0)[2], 1999D);
+    assertEquals(rows.get(1)[2], 1999D);
+    assertEquals(rows.get(0)[3], new Integer[]{0, 1, 2});
+    assertEquals(rows.get(1)[3], new Integer[]{0, 1, 2});
+    assertEquals(rows.get(0)[4], new String[]{"a0", "a01", "a02"});
+    assertEquals(rows.get(1)[4], new String[]{"a0", "a01", "a02"});
+    assertEquals(rows.get(0)[5], 0);
+    assertEquals(rows.get(1)[5], 0);
+
+    // Inter segment mix aggregation function with different result length
+    // Inter segment string column comparison test
+    query = "SELECT sum(intColumn), argmin(stringColumn, doubleColumn), 
argmin(stringColumn, stringColumn), "
+        + "argmin(stringColumn, doubleColumn, doubleColumn) FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(rows.size(), 4);
+
+    assertEquals(rows.get(0)[0], 7996000D);
+    assertEquals(rows.get(0)[1], 8D);
+    assertEquals(rows.get(0)[2], "a11");
+    assertEquals(rows.get(0)[3], 8D);
+
+    assertNull(rows.get(1)[0]);
+    assertEquals(rows.get(1)[1], 18D);
+    assertEquals(rows.get(1)[2], "a11");
+    assertEquals(rows.get(1)[3], 8D);
+
+    assertNull(rows.get(2)[0]);
+    assertEquals(rows.get(2)[1], 8D);
+    assertEquals(rows.get(2)[2], "a11");
+    assertNull(rows.get(2)[3]);
+
+    assertNull(rows.get(3)[0]);
+    assertEquals(rows.get(3)[1], 18D);
+    assertEquals(rows.get(3)[2], "a11");
+    assertNull(rows.get(3)[3]);
+
+    // Inter segment mix aggregation function with CASE statement
+    query = "SELECT argmin(CASE WHEN stringColumn = 'a33' THEN 'b' WHEN 
stringColumn = 'a22' THEN 'a' ELSE 'c' END"
+        + ", stringColumn), argmin(CASE WHEN stringColumn = 'a33' THEN 'b' 
WHEN stringColumn = 'a22' THEN 'a' "
+        + "ELSE 'c' END, CASE WHEN stringColumn = 'a33' THEN 'b' WHEN 
stringColumn = 'a22' THEN 'a' ELSE 'c' END) "
+        + "FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(rows.size(), 4);
+
+    assertEquals(rows.get(0)[0], "a22");
+    assertEquals(rows.get(0)[1], "a");
+    assertEquals(rows.get(1)[0], "a22");
+    assertEquals(rows.get(1)[1], "a");
+
+    //   TODO: The following query results in an exception, fix the support 
for multi-value bytes
+    //   query = "SELECT arg_min(intColumn, mvBytesColumn) FROM testTable";
+    //
+    //   brokerResponse = getBrokerResponse(query);
+    //   resultTable = brokerResponse.getResultTable();
+    //   rows = resultTable.getRows();
+  }
+
+  @Test
+  public void testEmptyAggregation() {
+    // Inter segment mix aggregation with no documents after filtering
+    String query =
+        "SELECT arg_max(intColumn, longColumn), argmin(CASE WHEN stringColumn 
= 'a33' THEN 'b' "
+            + "WHEN stringColumn = 'a22' THEN 'a' ELSE 'c' END"
+            + ", stringColumn) FROM testTable where intColumn > 10000";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertNull(rows.get(0)[0]);
+    assertNull(rows.get(0)[1]);
+    assertEquals(resultTable.getDataSchema().getColumnName(0), 
"argmax([intColumn, longColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(1),
+        
"argmin([case(equals(stringColumn,'a33'),equals(stringColumn,'a22'),'b','a','c'),
 stringColumn])");
+  }
+
+  @Test
+  public void testGroupByInterSegment() {

Review Comment:
   can you add a few group by tests where the MV column is used inside 
argmin/argmax as either the projection column or the measuring column?



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with 
the same measuring expressions
+ * are consolidated and added as a single function with a list of projection 
expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" 
will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the 
end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) 
will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, 
col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the 
projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled 
into the corresponding
+ * columns of the original ARG_MIN/ARG_MAX. For more syntax details please 
refer to ParentAggregationFunction,
+ * ChildAggregationFunction and ChildAggregationResultRewriter.
+ */
+public class ArgMinMaxRewriter implements QueryRewriter {
+
+  private static final String ARG_MAX = "argmax";
+  private static final String ARG_MIN = "argmin";
+
+  private static final String ARG_MAX_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
ARG_MAX;
+  private static final String ARG_MIN_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
ARG_MIN;
+
+  @Override
+  public PinotQuery rewrite(PinotQuery pinotQuery) {
+    // This map stores the mapping from the list of measuring expressions to 
the set of projection expressions
+    HashMap<List<Expression>, Set<Expression>> argMinFunctionMap = new 
HashMap<>();
+    // This map stores the mapping from the list of measuring expressions to 
the function ID
+    HashMap<List<Expression>, Integer> argMinFunctionIDMap = new HashMap<>();
+
+    HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap = new 
HashMap<>();
+    HashMap<List<Expression>, Integer> argMaxFunctionIDMap = new HashMap<>();
+
+    Iterator<Expression> iterator = pinotQuery.getSelectList().iterator();
+    while (iterator.hasNext()) {
+      boolean added = extractAndRewriteArgMinMaxFunctions(iterator.next(), 
argMaxFunctionMap, argMaxFunctionIDMap,
+          argMinFunctionMap, argMinFunctionIDMap);
+      // Remove the original function if it is not added, meaning it is a 
duplicate
+      if (!added) {
+        iterator.remove();
+      }
+    }
+
+    appendParentArgMinMaxFunctions(false, pinotQuery.getSelectList(), 
argMinFunctionMap, argMinFunctionIDMap);
+    appendParentArgMinMaxFunctions(true, pinotQuery.getSelectList(), 
argMaxFunctionMap, argMaxFunctionIDMap);
+
+    return pinotQuery;
+  }
+
+  /**
+   * This method appends the consolidated ARG_MIN/ARG_MAX functions to the end 
of the selection list.
+   * The consolidated function call will be in the following format:
+   * ARG_MAX(functionID, numMeasuringColumns, measuringColumn1, 
measuringColumn2, ...,
+   *  projectionColumn1, projectionColumn2, ...)
+   *  where functionID is the ID of the consolidated function, 
numMeasuringColumns is the number of measuring
+   *  columns, measuringColumn1, measuringColumn2, ... are the measuring 
columns, and projectionColumn1,
+   *  projectionColumn2, ... are the projection columns.
+   *  The number of projection columns is the same as the number of 
ARG_MIN/ARG_MAX functions with the same
+   *  measuring columns.
+   */
+  private void appendParentArgMinMaxFunctions(boolean isMax, List<Expression> 
selectList,
+      HashMap<List<Expression>, Set<Expression>> argMinMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMinMaxFunctionIDMap) {
+    for (Map.Entry<List<Expression>, Set<Expression>> entry : 
argMinMaxFunctionMap.entrySet()) {
+      Literal functionID = new Literal();
+      functionID.setLongValue(argMinMaxFunctionIDMap.get(entry.getKey()));
+      Literal numMeasuringColumns = new Literal();
+      numMeasuringColumns.setLongValue(entry.getKey().size());
+
+      Function parentFunction = new Function(isMax ? ARG_MAX_PARENT : 
ARG_MIN_PARENT);
+      parentFunction.addToOperands(new 
Expression(ExpressionType.LITERAL).setLiteral(functionID));
+      parentFunction.addToOperands(new 
Expression(ExpressionType.LITERAL).setLiteral(numMeasuringColumns));
+      for (Expression expression : entry.getKey()) {
+        parentFunction.addToOperands(expression);
+      }
+      for (Expression expression : entry.getValue()) {
+        parentFunction.addToOperands(expression);
+      }
+      selectList.add(new 
Expression(ExpressionType.FUNCTION).setFunctionCall(parentFunction));
+    }
+  }
+
+  /**
+   * This method extracts the ARG_MIN/ARG_MAX functions from the given 
expression and rewrites the functions
+   * with the same measuring expressions to use the same function ID.
+   * @return true if the function is not duplicated, false otherwise.
+   */
+  private boolean extractAndRewriteArgMinMaxFunctions(Expression expression,
+      HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMaxFunctionIDMap,
+      HashMap<List<Expression>, Set<Expression>> argMinFunctionMap,
+      HashMap<List<Expression>, Integer> argMinFunctionIDMap) {
+    Function function = expression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    String functionName = function.getOperator();
+    if (!(functionName.equals("argmin") || functionName.equals("argmax"))) {
+      return true;
+    }
+    List<Expression> operands = function.getOperands();
+    List<Expression> argMinMaxMeasuringExpressions = new ArrayList<>();
+    for (int i = 0; i < operands.size() - 1; i++) {
+      argMinMaxMeasuringExpressions.add(operands.get(i));
+    }
+    Expression argMinMaxProjectionExpression = operands.get(operands.size() - 
1);
+
+    if (functionName.equals("argmin")) {
+      return updateArgMinMaxFunctionMap(argMinMaxMeasuringExpressions, 
argMinMaxProjectionExpression, argMinFunctionMap,
+          argMinFunctionIDMap, function);
+    } else {
+      return updateArgMinMaxFunctionMap(argMinMaxMeasuringExpressions, 
argMinMaxProjectionExpression, argMaxFunctionMap,
+          argMaxFunctionIDMap, function);
+    }
+  }
+
+  /**
+   * This method rewrites the ARG_MIN/ARG_MAX function with the given 
measuring expressions to use the same
+   * function ID.
+   * @return true if the function is not duplicated, false otherwise.
+   */
+  boolean updateArgMinMaxFunctionMap(List<Expression> 
argMinMaxMeasuringExpressions,

Review Comment:
   nit: can this be private?
   
   also shouldn't the parameter `HashMap<List<Expression>, Integer> 
argMaxFunctionIDMap` be called `HashMap<List<Expression>, Integer> 
argMinMaxFunctionIDMap`



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with 
the same measuring expressions
+ * are consolidated and added as a single function with a list of projection 
expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" 
will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the 
end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) 
will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, 
col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the 
projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled 
into the corresponding
+ * columns of the original ARG_MIN/ARG_MAX. For more syntax details please 
refer to ParentAggregationFunction,
+ * ChildAggregationFunction and ChildAggregationResultRewriter.
+ */
+public class ArgMinMaxRewriter implements QueryRewriter {
+
+  private static final String ARG_MAX = "argmax";
+  private static final String ARG_MIN = "argmin";
+
+  private static final String ARG_MAX_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
ARG_MAX;
+  private static final String ARG_MIN_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
ARG_MIN;
+
+  @Override
+  public PinotQuery rewrite(PinotQuery pinotQuery) {
+    // This map stores the mapping from the list of measuring expressions to 
the set of projection expressions
+    HashMap<List<Expression>, Set<Expression>> argMinFunctionMap = new 
HashMap<>();
+    // This map stores the mapping from the list of measuring expressions to 
the function ID
+    HashMap<List<Expression>, Integer> argMinFunctionIDMap = new HashMap<>();
+
+    HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap = new 
HashMap<>();
+    HashMap<List<Expression>, Integer> argMaxFunctionIDMap = new HashMap<>();
+
+    Iterator<Expression> iterator = pinotQuery.getSelectList().iterator();
+    while (iterator.hasNext()) {
+      boolean added = extractAndRewriteArgMinMaxFunctions(iterator.next(), 
argMaxFunctionMap, argMaxFunctionIDMap,
+          argMinFunctionMap, argMinFunctionIDMap);
+      // Remove the original function if it is not added, meaning it is a 
duplicate
+      if (!added) {
+        iterator.remove();
+      }
+    }
+
+    appendParentArgMinMaxFunctions(false, pinotQuery.getSelectList(), 
argMinFunctionMap, argMinFunctionIDMap);
+    appendParentArgMinMaxFunctions(true, pinotQuery.getSelectList(), 
argMaxFunctionMap, argMaxFunctionIDMap);
+
+    return pinotQuery;
+  }
+
+  /**
+   * This method appends the consolidated ARG_MIN/ARG_MAX functions to the end 
of the selection list.
+   * The consolidated function call will be in the following format:
+   * ARG_MAX(functionID, numMeasuringColumns, measuringColumn1, 
measuringColumn2, ...,
+   *  projectionColumn1, projectionColumn2, ...)
+   *  where functionID is the ID of the consolidated function, 
numMeasuringColumns is the number of measuring
+   *  columns, measuringColumn1, measuringColumn2, ... are the measuring 
columns, and projectionColumn1,
+   *  projectionColumn2, ... are the projection columns.
+   *  The number of projection columns is the same as the number of 
ARG_MIN/ARG_MAX functions with the same
+   *  measuring columns.
+   */
+  private void appendParentArgMinMaxFunctions(boolean isMax, List<Expression> 
selectList,
+      HashMap<List<Expression>, Set<Expression>> argMinMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMinMaxFunctionIDMap) {
+    for (Map.Entry<List<Expression>, Set<Expression>> entry : 
argMinMaxFunctionMap.entrySet()) {
+      Literal functionID = new Literal();
+      functionID.setLongValue(argMinMaxFunctionIDMap.get(entry.getKey()));
+      Literal numMeasuringColumns = new Literal();
+      numMeasuringColumns.setLongValue(entry.getKey().size());
+
+      Function parentFunction = new Function(isMax ? ARG_MAX_PARENT : 
ARG_MIN_PARENT);
+      parentFunction.addToOperands(new 
Expression(ExpressionType.LITERAL).setLiteral(functionID));
+      parentFunction.addToOperands(new 
Expression(ExpressionType.LITERAL).setLiteral(numMeasuringColumns));
+      for (Expression expression : entry.getKey()) {
+        parentFunction.addToOperands(expression);
+      }
+      for (Expression expression : entry.getValue()) {
+        parentFunction.addToOperands(expression);
+      }
+      selectList.add(new 
Expression(ExpressionType.FUNCTION).setFunctionCall(parentFunction));
+    }
+  }
+
+  /**
+   * This method extracts the ARG_MIN/ARG_MAX functions from the given 
expression and rewrites the functions
+   * with the same measuring expressions to use the same function ID.
+   * @return true if the function is not duplicated, false otherwise.
+   */
+  private boolean extractAndRewriteArgMinMaxFunctions(Expression expression,
+      HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMaxFunctionIDMap,
+      HashMap<List<Expression>, Set<Expression>> argMinFunctionMap,
+      HashMap<List<Expression>, Integer> argMinFunctionIDMap) {
+    Function function = expression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    String functionName = function.getOperator();
+    if (!(functionName.equals("argmin") || functionName.equals("argmax"))) {
+      return true;
+    }
+    List<Expression> operands = function.getOperands();
+    List<Expression> argMinMaxMeasuringExpressions = new ArrayList<>();
+    for (int i = 0; i < operands.size() - 1; i++) {
+      argMinMaxMeasuringExpressions.add(operands.get(i));
+    }
+    Expression argMinMaxProjectionExpression = operands.get(operands.size() - 
1);
+
+    if (functionName.equals("argmin")) {

Review Comment:
   nit: use constant that were defined here?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ParentAggregationResultRewriter.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Use the result of parent aggregation functions to populate the result of 
child aggregation functions.
+ * This implementation is based on the column names of the result schema.
+ * The result column name of a parent aggregation function has the following 
format:
+ * CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
aggregationFunctionType + FunctionID
+ * The result column name of corresponding child aggregation function has the 
following format:
+ * aggregationFunctionType + FunctionID + 
CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ * + childFunctionKey
+ * This approach will not work with `AS` clauses as they alter the column 
names.
+ * TODO: Add support for `AS` clauses.
+ */
+public class ParentAggregationResultRewriter implements ResultRewriter {
+  public ParentAggregationResultRewriter() {
+  }
+
+  public static Map<String, ChildFunctionMapping> 
createChildFunctionMapping(DataSchema schema, Object[] row) {
+    Map<String, ChildFunctionMapping> childFunctionMapping = new HashMap<>();
+    for (int i = 0; i < schema.size(); i++) {
+      String columnName = schema.getColumnName(i);
+      if 
(columnName.startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX))
 {
+        ParentAggregationFunctionResultObject parent = 
(ParentAggregationFunctionResultObject) row[i];
+
+        DataSchema nestedSchema = parent.getSchema();
+        for (int j = 0; j < nestedSchema.size(); j++) {
+          String childColumnKey = nestedSchema.getColumnName(j);
+          String originalChildFunctionKey =
+              
columnName.substring(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX.length())
+                  + CommonConstants.RewriterConstants.CHILD_KEY_SEPERATOR + 
childColumnKey;
+          // aggregationFunctionType + childFunctionID + CHILD_KEY_SEPERATOR + 
childFunctionKeyInParent
+          childFunctionMapping.put(originalChildFunctionKey, new 
ChildFunctionMapping(parent, j, i));
+        }
+      }
+    }
+    return childFunctionMapping;
+  }
+
+  public RewriterResult rewrite(DataSchema dataSchema, List<Object[]> rows) {
+    int numParentAggregationFunctions = 0;
+    // Count the number of parent aggregation functions
+    for (int i = 0; i < dataSchema.size(); i++) {
+      if 
(dataSchema.getColumnName(i).startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX))
 {
+        numParentAggregationFunctions++;
+      }
+    }
+
+    if (numParentAggregationFunctions == 0 || rows.isEmpty()) {

Review Comment:
   i'd return right away (without calculating `numParentAggregationFunctions`) 
if rows is empty. avoids the extra step to walk the DataSchema list



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/ParentAggregationFunctionResultObject.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.utils;
+
+import java.io.Serializable;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+public interface ParentAggregationFunctionResultObject

Review Comment:
   Add a javadoc here so it's clearer when and where this interface is used (in 
case in the future someone wants to extend this for some other functionaity)



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentAggregationFunction.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ParentAggregationFunction<I, F extends 
ParentAggregationFunctionResultObject>

Review Comment:
   Add javadocs here too



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.

Review Comment:
   nit: is this javadoc right? should probably be arg min/max queries, right?



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ArgMinMaxTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"HistogramQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final int NUM_RECORDS = 2000;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String MV_INT_COLUMN = "mvIntColumn";
+  private static final String MV_BYTES_COLUMN = "mvBytesColumn";
+  private static final String MV_STRING_COLUMN = "mvStringColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String GROUP_BY_INT_COLUMN = "groupByIntColumn";
+  private static final String GROUP_BY_MV_INT_COLUMN = "groupByMVIntColumn";
+  private static final String GROUP_BY_INT_COLUMN2 = "groupByIntColumn2";
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, 
DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, 
DataType.DOUBLE).addMultiValueDimension(MV_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(MV_BYTES_COLUMN, DataType.BYTES)
+      .addMultiValueDimension(MV_STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(GROUP_BY_MV_INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN2, DataType.INT)
+      .build();
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return " WHERE intColumn >=  500";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    String[] stringSVVals = new String[]{"a2", "a3", "a4", "a5", "a6", "a7", 
"a8", "a9", "a11", "a22"};
+    int j = 1;
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, i);
+      record.putValue(LONG_COLUMN, (long) i - NUM_RECORDS / 2);
+      record.putValue(FLOAT_COLUMN, (float) i * 0.5);
+      record.putValue(DOUBLE_COLUMN, (double) i);
+      record.putValue(MV_INT_COLUMN, Arrays.asList(i, i + 1, i + 2));
+      record.putValue(MV_BYTES_COLUMN, 
Arrays.asList(String.valueOf(i).getBytes(), String.valueOf(i + 1).getBytes(),
+          String.valueOf(i + 2).getBytes()));
+      record.putValue(MV_STRING_COLUMN, Arrays.asList("a" + i, "a" + i + 1, 
"a" + i + 2));
+      if (i < 20) {
+        record.putValue(STRING_COLUMN, stringSVVals[i % stringSVVals.length]);
+      } else {
+        record.putValue(STRING_COLUMN, "a33");
+      }
+      record.putValue(GROUP_BY_INT_COLUMN, i % 5);
+      record.putValue(GROUP_BY_MV_INT_COLUMN, Arrays.asList(i % 10, (i + 1) % 
10));
+      if (i == j) {
+        j *= 2;
+      }
+      record.putValue(GROUP_BY_INT_COLUMN2, j);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+
+    QueryRewriterFactory.init(String.join(",", 
QueryRewriterFactory.DEFAULT_QUERY_REWRITERS_CLASS_NAMES)
+        + ",org.apache.pinot.sql.parsers.rewriter.ArgMinMaxRewriter");
+    ResultRewriterFactory
+        
.init("org.apache.pinot.core.query.utils.rewriter.ParentAggregationResultRewriter");
+  }
+
+  @Test
+  public void testAggregationInterSegment() {
+    // Simple inter segment
+    String query = "SELECT arg_max(intColumn, longColumn) FROM testTable";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+
+    assertEquals(rows.get(0)[0], 999L);
+    assertEquals(rows.get(1)[0], 999L);
+    assertEquals(rows.size(), 2);
+
+    // Inter segment data type test
+    query = "SELECT arg_max(intColumn, longColumn), arg_max(intColumn, 
floatColumn), "
+        + "arg_max(intColumn, doubleColumn), arg_min(intColumn, mvIntColumn), "
+        + "arg_min(intColumn, mvStringColumn), arg_min(intColumn, intColumn) 
FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(resultTable.getDataSchema().getColumnName(0), 
"argmax([intColumn, longColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(1), 
"argmax([intColumn, floatColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(2), 
"argmax([intColumn, doubleColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(3), 
"argmin([intColumn, mvIntColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(4), 
"argmin([intColumn, mvStringColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(5), 
"argmin([intColumn, intColumn])");
+
+    assertEquals(rows.size(), 2);
+    assertEquals(rows.get(0)[0], 999L);
+    assertEquals(rows.get(1)[0], 999L);
+    assertEquals(rows.get(0)[1], 999.5F);
+    assertEquals(rows.get(1)[1], 999.5F);
+    assertEquals(rows.get(0)[2], 1999D);
+    assertEquals(rows.get(1)[2], 1999D);
+    assertEquals(rows.get(0)[3], new Integer[]{0, 1, 2});
+    assertEquals(rows.get(1)[3], new Integer[]{0, 1, 2});
+    assertEquals(rows.get(0)[4], new String[]{"a0", "a01", "a02"});
+    assertEquals(rows.get(1)[4], new String[]{"a0", "a01", "a02"});
+    assertEquals(rows.get(0)[5], 0);
+    assertEquals(rows.get(1)[5], 0);
+
+    // Inter segment mix aggregation function with different result length
+    // Inter segment string column comparison test
+    query = "SELECT sum(intColumn), argmin(stringColumn, doubleColumn), 
argmin(stringColumn, stringColumn), "
+        + "argmin(stringColumn, doubleColumn, doubleColumn) FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(rows.size(), 4);
+
+    assertEquals(rows.get(0)[0], 7996000D);
+    assertEquals(rows.get(0)[1], 8D);
+    assertEquals(rows.get(0)[2], "a11");
+    assertEquals(rows.get(0)[3], 8D);
+
+    assertNull(rows.get(1)[0]);
+    assertEquals(rows.get(1)[1], 18D);
+    assertEquals(rows.get(1)[2], "a11");
+    assertEquals(rows.get(1)[3], 8D);
+
+    assertNull(rows.get(2)[0]);
+    assertEquals(rows.get(2)[1], 8D);
+    assertEquals(rows.get(2)[2], "a11");
+    assertNull(rows.get(2)[3]);
+
+    assertNull(rows.get(3)[0]);
+    assertEquals(rows.get(3)[1], 18D);
+    assertEquals(rows.get(3)[2], "a11");
+    assertNull(rows.get(3)[3]);
+
+    // Inter segment mix aggregation function with CASE statement
+    query = "SELECT argmin(CASE WHEN stringColumn = 'a33' THEN 'b' WHEN 
stringColumn = 'a22' THEN 'a' ELSE 'c' END"
+        + ", stringColumn), argmin(CASE WHEN stringColumn = 'a33' THEN 'b' 
WHEN stringColumn = 'a22' THEN 'a' "
+        + "ELSE 'c' END, CASE WHEN stringColumn = 'a33' THEN 'b' WHEN 
stringColumn = 'a22' THEN 'a' ELSE 'c' END) "
+        + "FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(rows.size(), 4);
+
+    assertEquals(rows.get(0)[0], "a22");
+    assertEquals(rows.get(0)[1], "a");
+    assertEquals(rows.get(1)[0], "a22");
+    assertEquals(rows.get(1)[1], "a");
+
+    //   TODO: The following query results in an exception, fix the support 
for multi-value bytes

Review Comment:
   perhaps uncomment this code and assert for the exception to be thrown. that 
way if this issue is fixed, this test will fail and enforce that someone fixes 
the test.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ResultRewriterFactory.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ResultRewriterFactory {
+
+  private ResultRewriterFactory() {
+  }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResultRewriterFactory.class);
+  static final List<String> DEFAULT_RESULT_REWRITERS_CLASS_NAMES = 
ImmutableList.of();

Review Comment:
   is this intentionally left empty? does this mean that if someone wants to 
use arg min/max they must set the broker config: 
`pinot.broker.result.rewriter.class.names`



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a 
server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;
+  // if the object contains non null values
+  private boolean _isNull;
+
+  // if the value is stored in a mutable list, this is true only when the 
Object is deserialized
+  // from a byte buffer
+  private boolean _mutable;

Review Comment:
   the code marks `_mutable` as true when calling this constructor 
`ArgMinMaxObject(DataSchema keySchema, DataSchema valSchema)` and false for the 
`ByteBuffer` constructor. Does this comment need to be updated or the flag 
needs to be reversed? it's a bit confusing



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -972,4 +974,11 @@ public static class Range {
   public static class IdealState {
     public static final String HYBRID_TABLE_TIME_BOUNDARY = 
"HYBRID_TABLE_TIME_BOUNDARY";
   }
+
+  public static class RewriterConstants {
+    public static final String PARENT_AGGREGATION_NAME_PREFIX = 
"pinotparentaggregation";
+    public static final String CHILD_AGGREGATION_NAME_PREFIX = 
"pinotchildaggregation";

Review Comment:
   nit: can these values be camel cased? e.g. pinotchildaggregation -> 
pinotChildAggregation



##########
pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriterTest.java:
##########
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class ArgMinMaxRewriterTest {
+  private static final QueryRewriter QUERY_REWRITER = new ArgMinMaxRewriter();
+
+  @Test
+  public void testQueryRewrite() {

Review Comment:
   Have some suggestions for enhancing the tests here:
   
   - can you also add a few group by queries here?
   - also add some examples where the measuring columns are different?
   - Can you also have one example where the same columns are used in the 
measuring expressions list, but the order is swapped (e.g. ARG_MIN(col1, col2, 
col3), ARG_MIN(col2, col1, col4))?
   
   I'd especially like to see examples of where the function IDs are non-0



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ParentAggregationResultRewriter.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Use the result of parent aggregation functions to populate the result of 
child aggregation functions.
+ * This implementation is based on the column names of the result schema.
+ * The result column name of a parent aggregation function has the following 
format:
+ * CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
aggregationFunctionType + FunctionID
+ * The result column name of corresponding child aggregation function has the 
following format:
+ * aggregationFunctionType + FunctionID + 
CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ * + childFunctionKey
+ * This approach will not work with `AS` clauses as they alter the column 
names.
+ * TODO: Add support for `AS` clauses.
+ */
+public class ParentAggregationResultRewriter implements ResultRewriter {
+  public ParentAggregationResultRewriter() {
+  }
+
+  public static Map<String, ChildFunctionMapping> 
createChildFunctionMapping(DataSchema schema, Object[] row) {

Review Comment:
   nit: does this need to be public? i only see it used in this class



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/QueryRewriterFactory.java:
##########
@@ -33,7 +33,7 @@ private QueryRewriterFactory() {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryRewriterFactory.class);
 
-  static final List<String> DEFAULT_QUERY_REWRITERS_CLASS_NAMES =
+  public static final List<String> DEFAULT_QUERY_REWRITERS_CLASS_NAMES =

Review Comment:
   nit: can you add the `@VisibleForTesting` annotation here?
   Also don't see the new `ArgMinMaxRewriter` on the default list. Does this 
mean we intend to only enable this for specific tenants via setting the broker 
config?



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with 
the same measuring expressions
+ * are consolidated and added as a single function with a list of projection 
expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" 
will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the 
end of the selection list.

Review Comment:
   Can you elaborate on what `PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)` 
here means? What's `#0` and `2` for example? What determines #0 (functionId)'s 
value?



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with 
the same measuring expressions
+ * are consolidated and added as a single function with a list of projection 
expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" 
will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the 
end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) 
will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, 
col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the 
projection expression,

Review Comment:
   Elaborate on how function ID is decided, i.e. it's determined by the 
measuring expressions, and each unique list of measuring expressions will 
result in a new function ID?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentArgMinMaxAggregationFunction.java:
##########
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject;
+import 
org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxWrapperValSet;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class ParentArgMinMaxAggregationFunction extends 
ParentAggregationFunction<ArgMinMaxObject, ArgMinMaxObject> {
+
+  // list of columns that we do min/max on
+  private final List<ExpressionContext> _measuringColumns;
+  // list of columns that we project based on the min/max value
+  private final List<ExpressionContext> _projectionColumns;
+  // true if we are doing argmax, false if we are doing argmin
+  private final boolean _isMax;
+  // the id of the function, this is to associate the result of the parent 
aggregation function with the
+  // child aggregation functions having the same type(argmin/argmax) and 
measuring columns
+  private final ExpressionContext _functionIdContext;
+  private final ExpressionContext _numMeasuringColumnContext;
+  // number of columns that we do min/max on
+  private final int _numMeasuringColumns;
+  // number of columns that we project based on the min/max value
+  private final int _numProjectionColumns;
+
+  // The following variable need to be initialized
+
+  // The wrapper classes for the block value sets
+  private final ThreadLocal<List<ArgMinMaxWrapperValSet>> 
_argMinMaxWrapperMeasuringColumnSets =
+      ThreadLocal.withInitial(ArrayList::new);
+  private final ThreadLocal<List<ArgMinMaxWrapperValSet>> 
_argMinMaxWrapperProjectionColumnSets =
+      ThreadLocal.withInitial(ArrayList::new);
+  // The schema for the measuring columns and projection columns
+  private final ThreadLocal<DataSchema> _measuringColumnSchema = new 
ThreadLocal<>();
+  private final ThreadLocal<DataSchema> _projectionColumnSchema = new 
ThreadLocal<>();
+  // If the schemas are initialized
+  private final ThreadLocal<Boolean> _schemaInitiated = 
ThreadLocal.withInitial(() -> false);

Review Comment:
   nit: rename to `_schemaInitialized`



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DummyGroupByResultHolder.java:
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.groupby;
+
+/**
+ * Placeholder GroupByResultHolder that does noop

Review Comment:
   maybe add a comment on where this placeholder is used?



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ArgMinMaxTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"HistogramQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final int NUM_RECORDS = 2000;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String MV_INT_COLUMN = "mvIntColumn";
+  private static final String MV_BYTES_COLUMN = "mvBytesColumn";
+  private static final String MV_STRING_COLUMN = "mvStringColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String GROUP_BY_INT_COLUMN = "groupByIntColumn";
+  private static final String GROUP_BY_MV_INT_COLUMN = "groupByMVIntColumn";
+  private static final String GROUP_BY_INT_COLUMN2 = "groupByIntColumn2";
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, 
DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, 
DataType.DOUBLE).addMultiValueDimension(MV_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(MV_BYTES_COLUMN, DataType.BYTES)
+      .addMultiValueDimension(MV_STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(GROUP_BY_MV_INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN2, DataType.INT)
+      .build();
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return " WHERE intColumn >=  500";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    String[] stringSVVals = new String[]{"a2", "a3", "a4", "a5", "a6", "a7", 
"a8", "a9", "a11", "a22"};
+    int j = 1;
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, i);
+      record.putValue(LONG_COLUMN, (long) i - NUM_RECORDS / 2);
+      record.putValue(FLOAT_COLUMN, (float) i * 0.5);
+      record.putValue(DOUBLE_COLUMN, (double) i);
+      record.putValue(MV_INT_COLUMN, Arrays.asList(i, i + 1, i + 2));
+      record.putValue(MV_BYTES_COLUMN, 
Arrays.asList(String.valueOf(i).getBytes(), String.valueOf(i + 1).getBytes(),
+          String.valueOf(i + 2).getBytes()));
+      record.putValue(MV_STRING_COLUMN, Arrays.asList("a" + i, "a" + i + 1, 
"a" + i + 2));
+      if (i < 20) {
+        record.putValue(STRING_COLUMN, stringSVVals[i % stringSVVals.length]);
+      } else {
+        record.putValue(STRING_COLUMN, "a33");
+      }
+      record.putValue(GROUP_BY_INT_COLUMN, i % 5);
+      record.putValue(GROUP_BY_MV_INT_COLUMN, Arrays.asList(i % 10, (i + 1) % 
10));
+      if (i == j) {
+        j *= 2;
+      }
+      record.putValue(GROUP_BY_INT_COLUMN2, j);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+
+    QueryRewriterFactory.init(String.join(",", 
QueryRewriterFactory.DEFAULT_QUERY_REWRITERS_CLASS_NAMES)
+        + ",org.apache.pinot.sql.parsers.rewriter.ArgMinMaxRewriter");
+    ResultRewriterFactory
+        
.init("org.apache.pinot.core.query.utils.rewriter.ParentAggregationResultRewriter");

Review Comment:
   Can you add a few test cases where we don't call any argmin/argmax 
aggregation functions while these configs are set? Just to ensure that 
non-argmin/max aggregations and group by queries still work as expected.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ChildAggregationFunction.java:
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.DummyAggregationResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.DummyGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ChildAggregationFunction implements 
AggregationFunction<Long, Long> {
+
+  private static final int CHILD_AGGREGATION_FUNCTION_ID_OFFSET = 0;
+  private static final int CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET = 1;
+  private final ExpressionContext _childFunctionKeyInParent;
+  private final List<ExpressionContext> _resultNameOperands;
+  private final ExpressionContext _childFunctionID;
+
+  ChildAggregationFunction(List<ExpressionContext> operands) {
+    _childFunctionID = operands.get(CHILD_AGGREGATION_FUNCTION_ID_OFFSET);
+    _childFunctionKeyInParent = 
operands.get(CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET);
+    _resultNameOperands = 
operands.subList(CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET + 1, 
operands.size());
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    ArrayList<ExpressionContext> expressionContexts = new ArrayList<>();
+    expressionContexts.add(_childFunctionID);
+    expressionContexts.add(_childFunctionKeyInParent);
+    expressionContexts.addAll(_resultNameOperands);
+    return expressionContexts;
+  }
+
+  @Override
+  public final AggregationResultHolder createAggregationResultHolder() {
+    return new DummyAggregationResultHolder();
+  }
+
+  @Override
+  public final GroupByResultHolder createGroupByResultHolder(int 
initialCapacity, int maxCapacity) {
+    return new DummyGroupByResultHolder();
+  }
+
+  @Override
+  public final void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final Long extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return 0L;
+  }
+
+  @Override
+  public final Long extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    return 0L;
+  }
+
+  @Override
+  public final Long merge(Long intermediateResult1, Long intermediateResult2) {
+    return 0L;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.UNKNOWN;
+  }
+
+  @Override
+  public final Long extractFinalResult(Long longValue) {
+    return 0L;
+  }
+
+  /**
+   * The name of the column as follows:
+   * CHILD_AGGREGATION_NAME_PREFIX + actual function type + operands + 
CHILD_AGGREGATION_SEPERATOR
+   * + actual function type + parent aggregation function id + 
CHILD_KEY_SEPERATOR + column key in parent function
+   */

Review Comment:
   an example will be very useful here



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with 
the same measuring expressions
+ * are consolidated and added as a single function with a list of projection 
expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" 
will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the 
end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) 
will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, 
col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the 
projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled 
into the corresponding
+ * columns of the original ARG_MIN/ARG_MAX. For more syntax details please 
refer to ParentAggregationFunction,
+ * ChildAggregationFunction and ChildAggregationResultRewriter.
+ */
+public class ArgMinMaxRewriter implements QueryRewriter {
+
+  private static final String ARG_MAX = "argmax";
+  private static final String ARG_MIN = "argmin";
+
+  private static final String ARG_MAX_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
ARG_MAX;
+  private static final String ARG_MIN_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
ARG_MIN;
+
+  @Override
+  public PinotQuery rewrite(PinotQuery pinotQuery) {
+    // This map stores the mapping from the list of measuring expressions to 
the set of projection expressions
+    HashMap<List<Expression>, Set<Expression>> argMinFunctionMap = new 
HashMap<>();
+    // This map stores the mapping from the list of measuring expressions to 
the function ID
+    HashMap<List<Expression>, Integer> argMinFunctionIDMap = new HashMap<>();
+
+    HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap = new 
HashMap<>();
+    HashMap<List<Expression>, Integer> argMaxFunctionIDMap = new HashMap<>();
+
+    Iterator<Expression> iterator = pinotQuery.getSelectList().iterator();
+    while (iterator.hasNext()) {
+      boolean added = extractAndRewriteArgMinMaxFunctions(iterator.next(), 
argMaxFunctionMap, argMaxFunctionIDMap,
+          argMinFunctionMap, argMinFunctionIDMap);
+      // Remove the original function if it is not added, meaning it is a 
duplicate
+      if (!added) {
+        iterator.remove();
+      }
+    }
+
+    appendParentArgMinMaxFunctions(false, pinotQuery.getSelectList(), 
argMinFunctionMap, argMinFunctionIDMap);
+    appendParentArgMinMaxFunctions(true, pinotQuery.getSelectList(), 
argMaxFunctionMap, argMaxFunctionIDMap);
+
+    return pinotQuery;
+  }
+
+  /**
+   * This method appends the consolidated ARG_MIN/ARG_MAX functions to the end 
of the selection list.
+   * The consolidated function call will be in the following format:
+   * ARG_MAX(functionID, numMeasuringColumns, measuringColumn1, 
measuringColumn2, ...,
+   *  projectionColumn1, projectionColumn2, ...)
+   *  where functionID is the ID of the consolidated function, 
numMeasuringColumns is the number of measuring
+   *  columns, measuringColumn1, measuringColumn2, ... are the measuring 
columns, and projectionColumn1,
+   *  projectionColumn2, ... are the projection columns.
+   *  The number of projection columns is the same as the number of 
ARG_MIN/ARG_MAX functions with the same
+   *  measuring columns.
+   */
+  private void appendParentArgMinMaxFunctions(boolean isMax, List<Expression> 
selectList,
+      HashMap<List<Expression>, Set<Expression>> argMinMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMinMaxFunctionIDMap) {
+    for (Map.Entry<List<Expression>, Set<Expression>> entry : 
argMinMaxFunctionMap.entrySet()) {
+      Literal functionID = new Literal();
+      functionID.setLongValue(argMinMaxFunctionIDMap.get(entry.getKey()));
+      Literal numMeasuringColumns = new Literal();
+      numMeasuringColumns.setLongValue(entry.getKey().size());
+
+      Function parentFunction = new Function(isMax ? ARG_MAX_PARENT : 
ARG_MIN_PARENT);
+      parentFunction.addToOperands(new 
Expression(ExpressionType.LITERAL).setLiteral(functionID));
+      parentFunction.addToOperands(new 
Expression(ExpressionType.LITERAL).setLiteral(numMeasuringColumns));
+      for (Expression expression : entry.getKey()) {
+        parentFunction.addToOperands(expression);
+      }
+      for (Expression expression : entry.getValue()) {
+        parentFunction.addToOperands(expression);
+      }
+      selectList.add(new 
Expression(ExpressionType.FUNCTION).setFunctionCall(parentFunction));
+    }
+  }
+
+  /**
+   * This method extracts the ARG_MIN/ARG_MAX functions from the given 
expression and rewrites the functions
+   * with the same measuring expressions to use the same function ID.
+   * @return true if the function is not duplicated, false otherwise.
+   */
+  private boolean extractAndRewriteArgMinMaxFunctions(Expression expression,
+      HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMaxFunctionIDMap,
+      HashMap<List<Expression>, Set<Expression>> argMinFunctionMap,
+      HashMap<List<Expression>, Integer> argMinFunctionIDMap) {
+    Function function = expression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    String functionName = function.getOperator();
+    if (!(functionName.equals("argmin") || functionName.equals("argmax"))) {

Review Comment:
   nit: use constant that were defined here?



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with 
the same measuring expressions
+ * are consolidated and added as a single function with a list of projection 
expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" 
will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the 
end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) 
will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, 
col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the 
projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled 
into the corresponding

Review Comment:
   nit: Latter -> Later



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ArgMinMaxTest extends BaseQueriesTest {

Review Comment:
   Can you add a few tests with `AS` to ensure that some kind of meaningful 
exception is thrown? Or at least show what the behavior will be with aliases? 



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with 
the same measuring expressions
+ * are consolidated and added as a single function with a list of projection 
expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" 
will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the 
end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) 
will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, 
col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the 
projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled 
into the corresponding
+ * columns of the original ARG_MIN/ARG_MAX. For more syntax details please 
refer to ParentAggregationFunction,
+ * ChildAggregationFunction and ChildAggregationResultRewriter.
+ */
+public class ArgMinMaxRewriter implements QueryRewriter {
+
+  private static final String ARG_MAX = "argmax";
+  private static final String ARG_MIN = "argmin";
+
+  private static final String ARG_MAX_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
ARG_MAX;
+  private static final String ARG_MIN_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
ARG_MIN;
+
+  @Override
+  public PinotQuery rewrite(PinotQuery pinotQuery) {
+    // This map stores the mapping from the list of measuring expressions to 
the set of projection expressions
+    HashMap<List<Expression>, Set<Expression>> argMinFunctionMap = new 
HashMap<>();
+    // This map stores the mapping from the list of measuring expressions to 
the function ID
+    HashMap<List<Expression>, Integer> argMinFunctionIDMap = new HashMap<>();
+
+    HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap = new 
HashMap<>();
+    HashMap<List<Expression>, Integer> argMaxFunctionIDMap = new HashMap<>();
+
+    Iterator<Expression> iterator = pinotQuery.getSelectList().iterator();
+    while (iterator.hasNext()) {
+      boolean added = extractAndRewriteArgMinMaxFunctions(iterator.next(), 
argMaxFunctionMap, argMaxFunctionIDMap,
+          argMinFunctionMap, argMinFunctionIDMap);
+      // Remove the original function if it is not added, meaning it is a 
duplicate
+      if (!added) {
+        iterator.remove();
+      }
+    }
+
+    appendParentArgMinMaxFunctions(false, pinotQuery.getSelectList(), 
argMinFunctionMap, argMinFunctionIDMap);
+    appendParentArgMinMaxFunctions(true, pinotQuery.getSelectList(), 
argMaxFunctionMap, argMaxFunctionIDMap);
+
+    return pinotQuery;
+  }
+
+  /**
+   * This method appends the consolidated ARG_MIN/ARG_MAX functions to the end 
of the selection list.
+   * The consolidated function call will be in the following format:
+   * ARG_MAX(functionID, numMeasuringColumns, measuringColumn1, 
measuringColumn2, ...,
+   *  projectionColumn1, projectionColumn2, ...)
+   *  where functionID is the ID of the consolidated function, 
numMeasuringColumns is the number of measuring
+   *  columns, measuringColumn1, measuringColumn2, ... are the measuring 
columns, and projectionColumn1,
+   *  projectionColumn2, ... are the projection columns.
+   *  The number of projection columns is the same as the number of 
ARG_MIN/ARG_MAX functions with the same
+   *  measuring columns.

Review Comment:
   nit: can you fix the comment indentation?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ParentAggregationResultRewriter.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Use the result of parent aggregation functions to populate the result of 
child aggregation functions.
+ * This implementation is based on the column names of the result schema.
+ * The result column name of a parent aggregation function has the following 
format:
+ * CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
aggregationFunctionType + FunctionID
+ * The result column name of corresponding child aggregation function has the 
following format:
+ * aggregationFunctionType + FunctionID + 
CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ * + childFunctionKey
+ * This approach will not work with `AS` clauses as they alter the column 
names.

Review Comment:
   what happens if someone specifies `AS` in these queries?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ResultRewriter.java:
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+/**
+ * Interface for rewriting the result of a query

Review Comment:
   Does it make sense to call out somewhere what kind of queries can invoke the 
`ResultRewriter`? For example, as part of this PR this is invoked for group by 
and aggregations only.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a 
server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;
+  // if the object contains non null values
+  private boolean _isNull;
+
+  // if the value is stored in a mutable list, this is true only when the 
Object is deserialized
+  // from a byte buffer
+  private boolean _mutable;
+
+  // the schema of the measuring columns
+  private final DataSchema _keySchema;
+  // the schema of the projection columns
+  private final DataSchema _valSchema;
+
+  // the size of the extremum key cols and value clos

Review Comment:
   nit: clos -> cols
   



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ParentAggregationResultRewriter.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Use the result of parent aggregation functions to populate the result of 
child aggregation functions.
+ * This implementation is based on the column names of the result schema.
+ * The result column name of a parent aggregation function has the following 
format:
+ * CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + 
aggregationFunctionType + FunctionID
+ * The result column name of corresponding child aggregation function has the 
following format:
+ * aggregationFunctionType + FunctionID + 
CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ * + childFunctionKey
+ * This approach will not work with `AS` clauses as they alter the column 
names.
+ * TODO: Add support for `AS` clauses.
+ */
+public class ParentAggregationResultRewriter implements ResultRewriter {
+  public ParentAggregationResultRewriter() {
+  }
+
+  public static Map<String, ChildFunctionMapping> 
createChildFunctionMapping(DataSchema schema, Object[] row) {
+    Map<String, ChildFunctionMapping> childFunctionMapping = new HashMap<>();
+    for (int i = 0; i < schema.size(); i++) {
+      String columnName = schema.getColumnName(i);
+      if 
(columnName.startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX))
 {
+        ParentAggregationFunctionResultObject parent = 
(ParentAggregationFunctionResultObject) row[i];
+
+        DataSchema nestedSchema = parent.getSchema();
+        for (int j = 0; j < nestedSchema.size(); j++) {
+          String childColumnKey = nestedSchema.getColumnName(j);
+          String originalChildFunctionKey =
+              
columnName.substring(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX.length())
+                  + CommonConstants.RewriterConstants.CHILD_KEY_SEPERATOR + 
childColumnKey;
+          // aggregationFunctionType + childFunctionID + CHILD_KEY_SEPERATOR + 
childFunctionKeyInParent
+          childFunctionMapping.put(originalChildFunctionKey, new 
ChildFunctionMapping(parent, j, i));
+        }
+      }
+    }
+    return childFunctionMapping;
+  }
+
+  public RewriterResult rewrite(DataSchema dataSchema, List<Object[]> rows) {
+    int numParentAggregationFunctions = 0;
+    // Count the number of parent aggregation functions
+    for (int i = 0; i < dataSchema.size(); i++) {
+      if 
(dataSchema.getColumnName(i).startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX))
 {
+        numParentAggregationFunctions++;
+      }
+    }
+
+    if (numParentAggregationFunctions == 0 || rows.isEmpty()) {
+      // no change to the result
+      return new RewriterResult(dataSchema, rows);
+    }
+
+    // Create a mapping from the child aggregation function name to the child 
aggregation function
+    Map<String, ChildFunctionMapping> childFunctionMapping = 
createChildFunctionMapping(dataSchema, rows.get(0));
+
+    String[] newColumnNames = new String[dataSchema.size() - 
numParentAggregationFunctions];
+    DataSchema.ColumnDataType[] newColumnDataTypes
+        = new DataSchema.ColumnDataType[dataSchema.size() - 
numParentAggregationFunctions];
+
+    // Create a mapping from the function offset in the final aggregation 
result
+    // to its own/parent function offset in the original aggregation result
+    Map<Integer, Integer> aggregationFunctionIndexMapping = new HashMap<>();
+    // Create a set of the result indices of the child aggregation functions
+    Set<Integer> childAggregationFunctionIndices = new HashSet<>();
+    // Create a mapping from the result aggregation function index to the 
nested index of the
+    // child aggregation function in the parent aggregation function
+    Map<Integer, Integer> childAggregationFunctionNestedIndexMapping = new 
HashMap<>();
+    // Create a set of the result indices of the parent aggregation functions
+    Set<Integer> parentAggregationFunctionIndices = new HashSet<>();
+
+    for (int i = 0, j = 0; i < dataSchema.size(); i++) {
+      String columnName = dataSchema.getColumnName(i);
+      // Skip the parent aggregation functions
+      if 
(columnName.startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX))
 {
+        parentAggregationFunctionIndices.add(i);
+        continue;
+      }
+
+      // for child aggregation functions and regular columns in the result
+      // create a new schema and populate the new column names and data types
+      // also populate the offset mappings used to rewrite the result
+      if 
(columnName.startsWith(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX))
 {
+        // This is a child column of a parent aggregation function
+        String childAggregationFunctionNameWithKey =
+            
columnName.substring(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX.length());
+        String[] s = childAggregationFunctionNameWithKey
+            
.split(CommonConstants.RewriterConstants.CHILD_AGGREGATION_SEPERATOR);
+        newColumnNames[j] = s[0];
+        ChildFunctionMapping childFunction = childFunctionMapping.get(s[1]);
+        newColumnDataTypes[j] = childFunction.getParent().getSchema()
+            .getColumnDataType(childFunction.getNestedOffset());
+
+        childAggregationFunctionNestedIndexMapping.put(j, 
childFunction.getNestedOffset());
+        childAggregationFunctionIndices.add(j);
+        aggregationFunctionIndexMapping.put(j, childFunction.getOffset());
+      } else {
+        // This is a regular column
+        newColumnNames[j] = columnName;
+        newColumnDataTypes[j] = dataSchema.getColumnDataType(i);
+
+        aggregationFunctionIndexMapping.put(j, i);
+      }
+      j++;
+    }
+
+    DataSchema newDataSchema = new DataSchema(newColumnNames, 
newColumnDataTypes);
+    List<Object[]> newRows = new ArrayList<>();
+
+    for (Object[] row : rows) {
+      int maxRows = parentAggregationFunctionIndices.stream().map(k -> {
+        ParentAggregationFunctionResultObject 
parentAggregationFunctionResultObject =
+            (ParentAggregationFunctionResultObject) row[k];
+        return parentAggregationFunctionResultObject.getNumberOfRows();
+      }).max(Integer::compareTo).orElse(0);
+      maxRows = maxRows == 0 ? 1 : maxRows;
+
+      List<Object[]> newRowsBuffer = new ArrayList<>();
+      for (int rowIter = 0; rowIter < maxRows; rowIter++) {
+        Object[] newRow = new Object[newDataSchema.size()];
+        for (int fieldIter = 0; fieldIter < newDataSchema.size(); fieldIter++) 
{
+          // If the field is a child aggregation function, extract the value 
from the parent result
+          if (childAggregationFunctionIndices.contains(fieldIter)) {
+            int offset = aggregationFunctionIndexMapping.get(fieldIter);
+            int nestedOffset = 
childAggregationFunctionNestedIndexMapping.get(fieldIter);
+            ParentAggregationFunctionResultObject 
parentAggregationFunctionResultObject =
+                (ParentAggregationFunctionResultObject) row[offset];
+            // If the parent result has more rows than the current row, 
extract the value from the row
+            if (rowIter < 
parentAggregationFunctionResultObject.getNumberOfRows()) {
+              newRow[fieldIter] = 
parentAggregationFunctionResultObject.getField(rowIter, nestedOffset);
+            } else {
+              newRow[fieldIter] = null;
+            }
+          } else { // If the field is a regular column, extract the value from 
the row, only the first row has value
+            if (rowIter == 0) {
+              newRow[fieldIter] = 
row[aggregationFunctionIndexMapping.get(fieldIter)];
+            } else {
+              newRow[fieldIter] = null;
+            }
+          }
+        }
+        newRowsBuffer.add(newRow);
+      }
+      newRows.addAll(newRowsBuffer);
+    }
+    return new RewriterResult(newDataSchema, newRows);
+  }
+
+  /**
+   * Mapping from child function key to the
+   * parent result object,
+   * offset of the parent result column in original result row,
+   * and the nested offset of the child function result in the parent data 
block
+   */

Review Comment:
   nit: fix formatting
   Also I think an example here might be useful to more easily understand what 
offset and nestedOffset really are



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a 
server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;
+  // if the object contains non null values
+  private boolean _isNull;
+
+  // if the value is stored in a mutable list, this is true only when the 
Object is deserialized
+  // from a byte buffer
+  private boolean _mutable;
+
+  // the schema of the measuring columns
+  private final DataSchema _keySchema;

Review Comment:
   from the variable names used in this class, it's hard to know what key and 
value are here. maybe add a comment somewhere just explaining that measuring 
columns are keys and the projection columns values, and that the keys are used 
purely for comparisons. or consider renaming them to be measuring and 
projecting instead?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a 
server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;
+  // if the object contains non null values
+  private boolean _isNull;
+
+  // if the value is stored in a mutable list, this is true only when the 
Object is deserialized
+  // from a byte buffer
+  private boolean _mutable;
+
+  // the schema of the measuring columns
+  private final DataSchema _keySchema;
+  // the schema of the projection columns
+  private final DataSchema _valSchema;
+
+  // the size of the extremum key cols and value clos
+  private final int _sizeOfExtremumKeys;
+  private final int _sizeOfExtremumVals;
+
+  // the current extremum keys
+  private Comparable[] _extremumKeys = null;
+  // the current extremum values
+  private final List<Object[]> _extremumValues = new ArrayList<>();
+
+  // used for ser/de
+  private DataBlock _immutableKeys;
+  private DataBlock _immutableVals;

Review Comment:
   recommend adding a note on how these interact with `_mutable` so that it is 
clear when one is used over the other.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ChildAggregationFunction.java:
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.DummyAggregationResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.DummyGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ChildAggregationFunction implements 
AggregationFunction<Long, Long> {
+
+  private static final int CHILD_AGGREGATION_FUNCTION_ID_OFFSET = 0;
+  private static final int CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET = 1;
+  private final ExpressionContext _childFunctionKeyInParent;
+  private final List<ExpressionContext> _resultNameOperands;
+  private final ExpressionContext _childFunctionID;
+
+  ChildAggregationFunction(List<ExpressionContext> operands) {
+    _childFunctionID = operands.get(CHILD_AGGREGATION_FUNCTION_ID_OFFSET);
+    _childFunctionKeyInParent = 
operands.get(CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET);
+    _resultNameOperands = 
operands.subList(CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET + 1, 
operands.size());
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    ArrayList<ExpressionContext> expressionContexts = new ArrayList<>();
+    expressionContexts.add(_childFunctionID);
+    expressionContexts.add(_childFunctionKeyInParent);
+    expressionContexts.addAll(_resultNameOperands);
+    return expressionContexts;
+  }
+
+  @Override
+  public final AggregationResultHolder createAggregationResultHolder() {
+    return new DummyAggregationResultHolder();
+  }
+
+  @Override
+  public final GroupByResultHolder createGroupByResultHolder(int 
initialCapacity, int maxCapacity) {
+    return new DummyGroupByResultHolder();
+  }
+
+  @Override
+  public final void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final Long extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return 0L;
+  }
+
+  @Override
+  public final Long extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    return 0L;
+  }
+
+  @Override
+  public final Long merge(Long intermediateResult1, Long intermediateResult2) {
+    return 0L;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.UNKNOWN;
+  }
+
+  @Override
+  public final Long extractFinalResult(Long longValue) {
+    return 0L;
+  }
+
+  /**
+   * The name of the column as follows:
+   * CHILD_AGGREGATION_NAME_PREFIX + actual function type + operands + 
CHILD_AGGREGATION_SEPERATOR
+   * + actual function type + parent aggregation function id + 
CHILD_KEY_SEPERATOR + column key in parent function
+   */
+  @Override
+  public final String getResultColumnName() {
+    String type = getType().getName().toLowerCase();
+    return CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+        // above is the prefix for all child aggregation functions
+
+        + type + "(" + 
_resultNameOperands.stream().map(ExpressionContext::toString)
+        .collect(Collectors.joining(",")) + ")"
+        // above is the actual child aggregation function name we want to 
return to the user
+
+        + CommonConstants.RewriterConstants.CHILD_AGGREGATION_SEPERATOR
+        + type
+        + _childFunctionID.getLiteral().getStringValue()
+        + CommonConstants.RewriterConstants.CHILD_KEY_SEPERATOR
+        + _childFunctionKeyInParent.toString();
+    // above is the column key in the parent aggregation function
+  }
+
+  @Override
+  public String toExplainString() {

Review Comment:
   Recommend adding a test or two to the `ExplainPlanQueriesTest` file to test 
out the EXPLAIN PLAN output



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentAggregationFunction.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ParentAggregationFunction<I, F extends 
ParentAggregationFunctionResultObject>
+    implements AggregationFunction<I, F> {
+
+  protected static final int PARENT_AGGREGATION_FUNCTION_ID_OFFSET = 0;
+  protected List<ExpressionContext> _arguments;
+
+  ParentAggregationFunction(List<ExpressionContext> arguments) {
+    _arguments = arguments;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  // The name of the column is the prefix of the parent aggregation function + 
the name of the
+  // aggregation function + the id of the parent aggregation function
+  @Override
+  public String getResultColumnName() {
+    return CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX
+        + getType().getName().toLowerCase()
+        + 
_arguments.get(PARENT_AGGREGATION_FUNCTION_ID_OFFSET).getLiteral().getIntValue();
+  }
+
+  public String toExplainString() {

Review Comment:
   Recommend adding a test or two to the ExplainPlanQueriesTest file to test 
out the EXPLAIN PLAN output



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentArgMinMaxAggregationFunction.java:
##########
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject;
+import 
org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxWrapperValSet;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class ParentArgMinMaxAggregationFunction extends 
ParentAggregationFunction<ArgMinMaxObject, ArgMinMaxObject> {
+
+  // list of columns that we do min/max on
+  private final List<ExpressionContext> _measuringColumns;
+  // list of columns that we project based on the min/max value
+  private final List<ExpressionContext> _projectionColumns;
+  // true if we are doing argmax, false if we are doing argmin
+  private final boolean _isMax;
+  // the id of the function, this is to associate the result of the parent 
aggregation function with the
+  // child aggregation functions having the same type(argmin/argmax) and 
measuring columns
+  private final ExpressionContext _functionIdContext;
+  private final ExpressionContext _numMeasuringColumnContext;
+  // number of columns that we do min/max on
+  private final int _numMeasuringColumns;
+  // number of columns that we project based on the min/max value
+  private final int _numProjectionColumns;
+
+  // The following variable need to be initialized
+
+  // The wrapper classes for the block value sets
+  private final ThreadLocal<List<ArgMinMaxWrapperValSet>> 
_argMinMaxWrapperMeasuringColumnSets =
+      ThreadLocal.withInitial(ArrayList::new);
+  private final ThreadLocal<List<ArgMinMaxWrapperValSet>> 
_argMinMaxWrapperProjectionColumnSets =
+      ThreadLocal.withInitial(ArrayList::new);
+  // The schema for the measuring columns and projection columns
+  private final ThreadLocal<DataSchema> _measuringColumnSchema = new 
ThreadLocal<>();
+  private final ThreadLocal<DataSchema> _projectionColumnSchema = new 
ThreadLocal<>();
+  // If the schemas are initialized
+  private final ThreadLocal<Boolean> _schemaInitiated = 
ThreadLocal.withInitial(() -> false);
+
+  public ParentArgMinMaxAggregationFunction(List<ExpressionContext> arguments, 
boolean isMax) {
+
+    super(arguments);
+    _isMax = isMax;
+    _functionIdContext = arguments.get(0);
+
+    _numMeasuringColumnContext = arguments.get(1);
+    _numMeasuringColumns = 
_numMeasuringColumnContext.getLiteral().getIntValue();
+
+    _measuringColumns = arguments.subList(2, 2 + _numMeasuringColumns);
+    _projectionColumns = arguments.subList(2 + _numMeasuringColumns, 
arguments.size());
+    _numProjectionColumns = _projectionColumns.size();
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return _isMax ? AggregationFunctionType.ARGMAX : 
AggregationFunctionType.ARGMIN;
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    ArrayList<ExpressionContext> expressionContexts = new ArrayList<>();
+    expressionContexts.add(_functionIdContext);
+    expressionContexts.add(_numMeasuringColumnContext);
+    expressionContexts.addAll(_measuringColumns);
+    expressionContexts.addAll(_projectionColumns);
+    return expressionContexts;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @SuppressWarnings("LoopStatementThatDoesntLoop")
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    ArgMinMaxObject argMinMaxObject = aggregationResultHolder.getResult();
+
+    if (argMinMaxObject == null) {
+      initializeWithNewDataBlocks(blockValSetMap);
+      argMinMaxObject = new ArgMinMaxObject(_measuringColumnSchema.get(), 
_projectionColumnSchema.get());
+    }
+
+    List<Integer> rowIds = new ArrayList<>();
+    for (int i = 0; i < length; i++) {
+      int compareResult = 
argMinMaxObject.compareAndSetKey(_argMinMaxWrapperMeasuringColumnSets.get(), i, 
_isMax);
+      if (compareResult == 0) {
+        // same key, add the rowId to the list
+        rowIds.add(i);
+      } else if (compareResult > 0) {
+        // new key is set, clear the list and add the new rowId
+        rowIds.clear();
+        rowIds.add(i);
+      }
+    }
+
+    // for all the rows that are associated with the extremum key, add the 
projection columns
+    for (Integer rowId : rowIds) {
+      argMinMaxObject.addVal(_argMinMaxWrapperProjectionColumnSets.get(), 
rowId);
+    }
+
+    aggregationResultHolder.setValue(argMinMaxObject);
+  }
+
+  // this method is called to initialize the schemas if they are not 
initialized
+  // and to set the new block value sets for the wrapper classes
+  private void initializeWithNewDataBlocks(Map<ExpressionContext, BlockValSet> 
blockValSetMap) {

Review Comment:
   this function is quite large and hard to read. can you modularize it a bit 
if possible?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxWrapperValSet.java:
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+
+
+public class ArgMinMaxWrapperValSet {

Review Comment:
   is my understanding correct that the comparator functions are only used for 
measuring columns and the value related functions only used for projection 
columns? Just wondering if it'll be cleaner to have a base class and extend it 
to add the comparator functions for measuring columns and the value ones for 
the projection columns. Just to make the responsibility very clear on how these 
APIs are used. Not a must do, but for someone just looking at this class 
without looking at the usage the APIs are a bit confusing
   
   e.g. getComparable and getValue return the same underlying arrays



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentAggregationFunction.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ParentAggregationFunction<I, F extends 
ParentAggregationFunctionResultObject>
+    implements AggregationFunction<I, F> {
+
+  protected static final int PARENT_AGGREGATION_FUNCTION_ID_OFFSET = 0;
+  protected List<ExpressionContext> _arguments;
+
+  ParentAggregationFunction(List<ExpressionContext> arguments) {
+    _arguments = arguments;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  // The name of the column is the prefix of the parent aggregation function + 
the name of the
+  // aggregation function + the id of the parent aggregation function

Review Comment:
   an example will be useful here



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DummyAggregationResultHolder.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.groupby;
+
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+
+
+/**
+ * Placeholder AggregationResultHolder that does noop

Review Comment:
   maybe add a comment on where this placeholder is used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to