This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ad32a2a5ec adding in AggregateNode and related contents. (#8946)
ad32a2a5ec is described below

commit ad32a2a5ec113165217066ff9dd2a8b5d297ead1
Author: Rong Rong <walterddr.walter...@gmail.com>
AuthorDate: Thu Jun 30 12:44:35 2022 -0700

    adding in AggregateNode and related contents. (#8946)
    
    - adding agg split rules
      - adding agg split rules for leaf-intermediate split
      - adding agg operator conversion from input groups to intermediate
        group
      - support agg after JOIN as well
    
    - adding in agg without group by as well
      - fix add vs. replace selectList
      - also support multi-column group by key
    
    - misc
      - support multi-column JOIN
      - also fix hash distribution rule
      - validate that transform actually works with group-by
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/common/utils/request/RequestUtils.java   |   2 +-
 .../tests/MultiStageEngineIntegrationTest.java     |   7 +-
 .../query/parser/CalciteRexExpressionParser.java   |  34 +++-
 .../query/planner/hints/PinotRelationalHints.java  |   2 +
 .../query/planner/logical/RelToStageConverter.java |   8 +
 .../pinot/query/planner/logical/RexExpression.java |  17 +-
 .../pinot/query/planner/logical/StagePlanner.java  |   3 +-
 .../partitioning/FieldSelectionKeySelector.java    |   2 +-
 .../pinot/query/planner/stage/AggregateNode.java   |  58 ++++++
 .../query/planner/stage/StageNodeSerDeUtils.java   |   2 +
 .../PinotAggregateExchangeNodeInsertRule.java      | 181 ++++++++++++++++++
 ...e.java => PinotJoinExchangeNodeInsertRule.java} |  10 +-
 .../pinot/query/rules/PinotQueryRuleSets.java      |   3 +-
 .../pinot/query/QueryEnvironmentTestBase.java      |  10 +-
 .../runtime/executor/WorkerQueryExecutor.java      |   7 +
 .../query/runtime/operator/AggregateOperator.java  | 211 +++++++++++++++++++++
 .../query/runtime/utils/ServerRequestUtils.java    |  10 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |   2 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |  22 ++-
 19 files changed, 565 insertions(+), 26 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index 32afe620fc..4f0f7bcca8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -114,7 +114,7 @@ public class RequestUtils {
   }
 
   public static Expression getFunctionExpression(String canonicalName) {
-    assert 
canonicalName.equals(canonicalizeFunctionNamePreservingSpecialKey(canonicalName));
+    assert 
canonicalName.equalsIgnoreCase(canonicalizeFunctionNamePreservingSpecialKey(canonicalName));
     Expression expression = new Expression(ExpressionType.FUNCTION);
     Function function = new Function(canonicalName);
     expression.setFunctionCall(function);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 5d149de4ed..0d78d2b182 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -113,10 +113,11 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTest
   @DataProvider
   public Object[][] multiStageQueryEngineSqlTestSet() {
     return new Object[][] {
-        new Object[]{"SELECT * FROM mytable_OFFLINE", 10, 73},
+        new Object[]{"SELECT COUNT(*) FROM mytable_OFFLINE WHERE 
Carrier='AA'", 1, 1},
+        new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>1000", 2, 
73},
         new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE WHERE 
CarrierDelay=15 AND ArrDelay>20", 10, 2},
-        new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE 
AS b ON a.AirlineID = b.AirlineID "
-            + " WHERE a.CarrierDelay=15 AND a.ArrDelay>20 AND b.ArrDelay<20", 
10, 146}
+        new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE 
AS b ON a.Origin = b.Origin "
+            + " WHERE a.Carrier='AA' AND a.ArrDelay>1000 AND b.ArrDelay>1000", 
2, 146}
     };
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index a4f5753557..70ee68b21c 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -53,8 +53,13 @@ public class CalciteRexExpressionParser {
   // Relational conversion Utils
   // --------------------------------------------------------------------------
 
-  public static List<Expression> convertSelectList(List<RexExpression> 
rexNodeList, PinotQuery pinotQuery) {
-    List<Expression> selectExpr = new ArrayList<>();
+  public static List<Expression> overwriteSelectList(List<RexExpression> 
rexNodeList, PinotQuery pinotQuery) {
+    return addSelectList(new ArrayList<>(), rexNodeList, pinotQuery);
+  }
+
+  public static List<Expression> addSelectList(List<Expression> existingList, 
List<RexExpression> rexNodeList,
+      PinotQuery pinotQuery) {
+    List<Expression> selectExpr = new ArrayList<>(existingList);
 
     final Iterator<RexExpression> iterator = rexNodeList.iterator();
     while (iterator.hasNext()) {
@@ -65,6 +70,18 @@ public class CalciteRexExpressionParser {
     return selectExpr;
   }
 
+  public static List<Expression> convertGroupByList(List<RexExpression> 
rexNodeList, PinotQuery pinotQuery) {
+    List<Expression> groupByExpr = new ArrayList<>();
+
+    final Iterator<RexExpression> iterator = rexNodeList.iterator();
+    while (iterator.hasNext()) {
+      final RexExpression next = iterator.next();
+      groupByExpr.add(toExpression(next, pinotQuery));
+    }
+
+    return groupByExpr;
+  }
+
   private static List<Expression> 
convertDistinctSelectList(RexExpression.FunctionCall rexCall, PinotQuery 
pinotQuery) {
     List<Expression> selectExpr = new ArrayList<>();
     selectExpr.add(convertDistinctAndSelectListToFunctionExpression(rexCall, 
pinotQuery));
@@ -169,7 +186,7 @@ public class CalciteRexExpressionParser {
       operands.add(toExpression(childNode, pinotQuery));
     }
     ParserUtils.validateFunction(functionName, operands);
-    Expression functionExpression = 
RequestUtils.getFunctionExpression(functionName);
+    Expression functionExpression = 
RequestUtils.getFunctionExpression(canonicalizeFunctionName(functionName));
     functionExpression.getFunctionCall().setOperands(operands);
     return functionExpression;
   }
@@ -209,4 +226,15 @@ public class CalciteRexExpressionParser {
     andExpression.getFunctionCall().setOperands(operands);
     return andExpression;
   }
+
+  /**
+   * Canonicalize Calcite generated Logical function names.
+   */
+  private static String canonicalizeFunctionName(String functionName) {
+    if (functionName.endsWith("0")) {
+      return functionName.substring(0, functionName.length() - 1);
+    } else {
+      return functionName;
+    }
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java
index 19a9daa54f..2c4cb976a6 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java
@@ -27,6 +27,8 @@ import org.apache.calcite.rel.hint.RelHint;
 public class PinotRelationalHints {
   public static final RelHint USE_HASH_DISTRIBUTE = 
RelHint.builder("USE_HASH_DISTRIBUTE").build();
   public static final RelHint USE_BROADCAST_DISTRIBUTE = 
RelHint.builder("USE_BROADCAST_DISTRIBUTE").build();
+  public static final RelHint AGG_INTERMEDIATE_STAGE = 
RelHint.builder("AGG_INTERMEDIATE_STAGE").build();
+  public static final RelHint AGG_LEAF_STAGE = 
RelHint.builder("AGG_LEAF_STAGE").build();
 
   private PinotRelationalHints() {
     // do not instantiate.
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index bc6d7dc4ca..23f8fb6db8 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
@@ -32,6 +33,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexCall;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
@@ -65,11 +67,17 @@ public final class RelToStageConverter {
       return convertLogicalProject((LogicalProject) node, currentStageId);
     } else if (node instanceof LogicalFilter) {
       return convertLogicalFilter((LogicalFilter) node, currentStageId);
+    } else if (node instanceof LogicalAggregate) {
+      return convertLogicalAggregate((LogicalAggregate) node, currentStageId);
     } else {
       throw new UnsupportedOperationException("Unsupported logical plan node: 
" + node);
     }
   }
 
+  private static StageNode convertLogicalAggregate(LogicalAggregate node, int 
currentStageId) {
+    return new AggregateNode(currentStageId, node.getAggCallList(), 
node.getGroupSet());
+  }
+
   private static StageNode convertLogicalProject(LogicalProject node, int 
currentStageId) {
     return new ProjectNode(currentStageId, node.getProjects());
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index 17e472c811..778907e40a 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.logical;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
@@ -62,18 +63,24 @@ public interface RexExpression {
     }
   }
 
+  static RexExpression toRexExpression(AggregateCall aggCall) {
+    List<RexExpression> operands = 
aggCall.getArgList().stream().map(InputRef::new).collect(Collectors.toList());
+    return new RexExpression.FunctionCall(aggCall.getAggregation().getKind(), 
toDataType(aggCall.getType()),
+        aggCall.getAggregation().getName(), operands);
+  }
+
   static Object toRexValue(FieldSpec.DataType dataType, Comparable value) {
     switch (dataType) {
       case INT:
-        return ((BigDecimal) value).intValue();
+        return value == null ? 0 : ((BigDecimal) value).intValue();
       case LONG:
-        return ((BigDecimal) value).longValue();
+        return value == null ? 0L : ((BigDecimal) value).longValue();
       case FLOAT:
-        return ((BigDecimal) value).floatValue();
+        return value == null ? 0f : ((BigDecimal) value).floatValue();
       case DOUBLE:
-        return ((BigDecimal) value).doubleValue();
+        return value == null ? 0d : ((BigDecimal) value).doubleValue();
       case STRING:
-        return ((NlsString) value).getValue();
+        return value == null ? "" : ((NlsString) value).getValue();
       default:
         return value;
     }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index db7fd63d83..8d9bbbc51d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -99,13 +99,14 @@ public class StagePlanner {
       // 1. exchangeNode always have only one input, get its input converted 
as a new stage root.
       StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
       RelDistribution distribution = ((LogicalExchange) 
node).getDistribution();
+      List<Integer> distributionKeys = distribution.getKeys();
       RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
       StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, 
nextStageRoot.getStageId(), exchangeType);
       StageNode mailboxSender = new 
MailboxSendNode(nextStageRoot.getStageId(), mailboxReceiver.getStageId(),
           exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distribution.getKeys().get(0)) : 
null);
+          ? new FieldSelectionKeySelector(distributionKeys) : null);
       mailboxSender.addInput(nextStageRoot);
 
       // 3. put the sender side as a completed stage.
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
index 674cc8e2a2..fd04ca589e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
@@ -71,6 +71,6 @@ public class FieldSelectionKeySelector implements 
KeySelector<Object[], Object[]
     for (int columnIndex : _columnIndices) {
       hashCodeBuilder.append(input[columnIndex]);
     }
-    return hashCodeBuilder.toHashCode();
+    return Math.abs(hashCodeBuilder.toHashCode());
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
new file mode 100644
index 0000000000..ae41d14a79
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+public class AggregateNode extends AbstractStageNode {
+  @ProtoProperties
+  private List<RexExpression> _aggCalls;
+  @ProtoProperties
+  private List<RexExpression> _groupSet;
+
+  public AggregateNode(int stageId) {
+    super(stageId);
+  }
+
+  public AggregateNode(int stageId, List<AggregateCall> aggCalls, 
ImmutableBitSet groupSet) {
+    super(stageId);
+    _aggCalls = 
aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
+    _groupSet = new ArrayList<>(groupSet.cardinality());
+    Iterator<Integer> groupSetIt = groupSet.iterator();
+    while (groupSetIt.hasNext()) {
+      _groupSet.add(new RexExpression.InputRef(groupSetIt.next()));
+    }
+  }
+
+  public List<RexExpression> getAggCalls() {
+    return _aggCalls;
+  }
+
+  public List<RexExpression> getGroupSet() {
+    return _groupSet;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index 3d34f6effb..8d341a207c 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -56,6 +56,8 @@ public final class StageNodeSerDeUtils {
         return new ProjectNode(stageId);
       case "FilterNode":
         return new FilterNode(stageId);
+      case "AggregateNode":
+        return new AggregateNode(stageId);
       case "MailboxSendNode":
         return new MailboxSendNode(stageId);
       case "MailboxReceiveNode":
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
new file mode 100644
index 0000000000..1150bc2085
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.rules;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.pinot.query.planner.hints.PinotRelationalHints;
+
+
+/**
+ * Special rule for Pinot, this rule is fixed to generate a 2-stage 
aggregation split between the
+ * (1) non-data-locale Pinot server agg stage, and (2) the data-locale Pinot 
intermediate agg stage.
+ *
+ * Pinot uses special intermediate data representation for partially 
aggregated results, thus we can't use
+ * {@link org.apache.calcite.rel.rules.AggregateReduceFunctionsRule} to reduce 
complex aggregation.
+ *
+ * This rule is here to introduces Pinot-special aggregation splits. 
In-general, all aggregations are split into
+ * intermediate-stage AGG; and server-stage AGG with the same naming. E.g.
+ *
+ * COUNT(*) transforms into: COUNT(*)_SERVER --> COUNT(*)_INTERMEDIATE, where
+ *   COUNT(*)_SERVER produces TUPLE[ SUM(1), GROUP_BY_KEY ]
+ *   COUNT(*)_INTERMEDIATE produces TUPLE[ SUM(COUNT(*)_SERVER), GROUP_BY_KEY ]
+ *
+ * However, the suffix _SERVER/_INTERMEDIATE is merely a SQL hint to the 
Aggregate operator and will be translated
+ * into correct, actual operator chain during Physical plan.
+ */
+public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
+  public static final PinotAggregateExchangeNodeInsertRule INSTANCE =
+      new PinotAggregateExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
+
+  public PinotAggregateExchangeNodeInsertRule(RelBuilderFactory factory) {
+    super(operand(LogicalAggregate.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1) {
+      return false;
+    }
+    if (call.rel(0) instanceof Aggregate) {
+      Aggregate agg = call.rel(0);
+      return !agg.getHints().contains(PinotRelationalHints.AGG_LEAF_STAGE)
+          && 
!agg.getHints().contains(PinotRelationalHints.AGG_INTERMEDIATE_STAGE);
+    }
+    return false;
+  }
+
+  /**
+   * Split the AGG into 2 stages, both with the same AGG type,
+   * Pinot internal stage optimization can use the info of the input data type 
to infer whether it should generate
+   * the "intermediate-stage AGG operator" or a "leaf-stage AGG operator"
+   * @see org.apache.pinot.core.query.aggregation.function.AggregationFunction
+   *
+   * @param call the {@link RelOptRuleCall} on match.
+   */
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Aggregate oldAggRel = call.rel(0);
+    ImmutableList<RelHint> orgHints = oldAggRel.getHints();
+
+    // 1. attach leaf agg RelHint to original agg.
+    ImmutableList<RelHint> newLeafAggHints =
+        new 
ImmutableList.Builder<RelHint>().addAll(orgHints).add(PinotRelationalHints.AGG_LEAF_STAGE).build();
+    Aggregate newLeafAgg =
+        new LogicalAggregate(oldAggRel.getCluster(), oldAggRel.getTraitSet(), 
newLeafAggHints, oldAggRel.getInput(),
+            oldAggRel.getGroupSet(), oldAggRel.getGroupSets(), 
oldAggRel.getAggCallList());
+
+    // 2. attach exchange.
+    List<Integer> groupSetIndices = ImmutableIntList.range(0, 
oldAggRel.getGroupCount());
+    LogicalExchange exchange = null;
+    if (groupSetIndices.size() == 0) {
+      exchange = LogicalExchange.create(newLeafAgg, 
RelDistributions.SINGLETON);
+    } else {
+      exchange = LogicalExchange.create(newLeafAgg, 
RelDistributions.hash(groupSetIndices));
+    }
+
+    // 3. attach intermediate agg stage.
+    RelNode newAggNode = makeNewIntermediateAgg(call, oldAggRel, exchange);
+    call.transformTo(newAggNode);
+  }
+
+  private RelNode makeNewIntermediateAgg(RelOptRuleCall ruleCall, Aggregate 
oldAggRel, LogicalExchange exchange) {
+
+    // add the exchange as the input node to the relation builder.
+    RelBuilder relBuilder = ruleCall.builder();
+    relBuilder.push(exchange);
+    List<RexNode> inputExprs = new ArrayList<>(relBuilder.fields());
+
+    // make input ref to the exchange after the leaf aggregate.
+    RexBuilder rexBuilder = exchange.getCluster().getRexBuilder();
+    final int nGroups = oldAggRel.getGroupCount();
+    for (int i = 0; i < nGroups; i++) {
+      rexBuilder.makeInputRef(oldAggRel, i);
+    }
+
+    // create new aggregate function calls from exchange input.
+    List<AggregateCall> oldCalls = oldAggRel.getAggCallList();
+    List<AggregateCall> newCalls = new ArrayList<>();
+    Map<AggregateCall, RexNode> aggCallMapping = new HashMap<>();
+
+    for (int oldCallIndex = 0; oldCallIndex < oldCalls.size(); oldCallIndex++) 
{
+      AggregateCall oldCall = oldCalls.get(oldCallIndex);
+      convertAggCall(rexBuilder, oldAggRel, oldCallIndex, oldCall, newCalls, 
aggCallMapping, inputExprs);
+    }
+
+    // create new aggregate relation.
+    ImmutableList<RelHint> orgHints = oldAggRel.getHints();
+    ImmutableList<RelHint> newIntermediateAggHints =
+        new 
ImmutableList.Builder<RelHint>().addAll(orgHints).add(PinotRelationalHints.AGG_INTERMEDIATE_STAGE).build();
+    ImmutableBitSet groupSet = ImmutableBitSet.range(nGroups);
+    relBuilder.aggregate(
+        relBuilder.groupKey(groupSet, ImmutableList.of(groupSet)),
+        newCalls);
+    relBuilder.hints(newIntermediateAggHints);
+    return relBuilder.build();
+  }
+
+  /**
+   * convert aggregate call based on the intermediate stage input.
+   *
+   * <p>Note that the intermediate stage input only supports splittable 
aggregators such as SUM/MIN/MAX.
+   * All non-splittable aggregator must be converted into splittable 
aggregator first.
+   */
+  private static void convertAggCall(RexBuilder rexBuilder, Aggregate 
oldAggRel, int oldCallIndex,
+      AggregateCall oldCall, List<AggregateCall> newCalls, Map<AggregateCall, 
RexNode> aggCallMapping,
+      List<RexNode> inputExprs) {
+    final int nGroups = oldAggRel.getGroupCount();
+    AggregateCall newCall = AggregateCall.create(
+        oldCall.getAggregation(), oldCall.isDistinct(), 
oldCall.isApproximate(), oldCall.ignoreNulls(),
+        convertArgList(nGroups + oldCallIndex, oldCall.getArgList()), 
oldCall.filterArg, oldCall.distinctKeys,
+        oldCall.collation, oldCall.type, oldCall.getName());
+    rexBuilder.addAggCall(newCall,
+        nGroups,
+        newCalls,
+        aggCallMapping,
+        oldAggRel.getInput()::fieldIsNullable);
+  }
+
+  private static List<Integer> convertArgList(int oldCallIndexWithShift, 
List<Integer> argList) {
+    Preconditions.checkArgument(argList.size() <= 1,
+        "Unable to convert call as the argList contains more than 1 argument");
+    return argList.size() == 1 ? 
Collections.singletonList(oldCallIndexWithShift) : Collections.emptyList();
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
similarity index 91%
rename from 
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
rename to 
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
index 2a1b740669..6aaacccac1 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
@@ -38,13 +38,13 @@ import 
org.apache.pinot.query.planner.hints.PinotRelationalHints;
 
 
 /**
- * Special rule for Pinot, always insert exchange after JOIN
+ * Special rule for Pinot, this rule is fixed to always insert exchange after 
JOIN node.
  */
-public class PinotExchangeNodeInsertRule extends RelOptRule {
-  public static final PinotExchangeNodeInsertRule INSTANCE =
-      new PinotExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
+public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
+  public static final PinotJoinExchangeNodeInsertRule INSTANCE =
+      new PinotJoinExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
 
-  public PinotExchangeNodeInsertRule(RelBuilderFactory factory) {
+  public PinotJoinExchangeNodeInsertRule(RelBuilderFactory factory) {
     super(operand(LogicalJoin.class, any()), factory, null);
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
index 63c2fd799f..1f3759b7b4 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
@@ -89,5 +89,6 @@ public class PinotQueryRuleSets {
           PruneEmptyRules.UNION_INSTANCE,
 
           // Pinot specific rules
-          PinotExchangeNodeInsertRule.INSTANCE);
+          PinotJoinExchangeNodeInsertRule.INSTANCE,
+          PinotAggregateExchangeNodeInsertRule.INSTANCE);
 }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 176bf3edc8..1c6ad7ff29 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -47,7 +47,15 @@ public class QueryEnvironmentTestBase {
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 
>= 0"},
         new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = 
b.col2"},
         new Object[]{"SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = 
b.col2 "
-            + "WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"},
+            + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"},
+        new Object[]{"SELECT a.col1, a.col3 + a.ts FROM a WHERE a.col3 >= 0 
AND a.col2 = 'a'"},
+        new Object[]{"SELECT SUM(a.col3), COUNT(*) FROM a WHERE a.col3 >= 0 
AND a.col2 = 'a'"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND 
a.col2 = 'a' GROUP BY a.col1"},
+        new Object[]{"SELECT a.col1, AVG(a.col3) FROM a WHERE a.col3 >= 0 AND 
a.col2 = 'a' GROUP BY a.col1"},
+        new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 
>= 0 AND a.col1 = 'a' "
+            + " GROUP BY a.col1, a.col2"},
+        new Object[]{"SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = 
b.col2 "
+            + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY 
a.col1"},
     };
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 01a97643ab..379d7bdbd6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -30,6 +30,7 @@ import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -38,6 +39,7 @@ import org.apache.pinot.query.planner.stage.ProjectNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -117,6 +119,11 @@ public class WorkerQueryExecutor {
       BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, 
joinNode.getInputs().get(0), metadataMap);
       BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, 
joinNode.getInputs().get(1), metadataMap);
       return new HashJoinOperator(leftOperator, rightOperator, 
joinNode.getCriteria());
+    } else if (stageNode instanceof AggregateNode) {
+      AggregateNode aggregateNode = (AggregateNode) stageNode;
+      BaseOperator<TransferableBlock> inputOperator =
+          getOperator(requestId, aggregateNode.getInputs().get(0), 
metadataMap);
+      return new AggregateOperator(inputOperator, aggregateNode.getAggCalls(), 
aggregateNode.getGroupSet());
     } else if (stageNode instanceof FilterNode) {
       throw new UnsupportedOperationException("Unsupported!");
     } else if (stageNode instanceof ProjectNode) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
new file mode 100644
index 0000000000..95a5dcd03f
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.CountAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.MaxAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.MinAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.SumAggregationFunction;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+/**
+ *
+ */
+public class AggregateOperator extends BaseOperator<TransferableBlock> {
+  private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+  private BaseOperator<TransferableBlock> _inputOperator;
+  private List<RexExpression> _aggCalls;
+  private List<RexExpression> _groupSet;
+
+  private final AggregationFunction[] _aggregationFunctions;
+  private final Map<Integer, Object>[] _groupByResultHolders;
+  private final Map<Integer, Object[]> _groupByKeyHolder;
+
+  private DataSchema _dataSchema;
+  private boolean _isCumulativeBlockConstructed;
+
+  // TODO: refactor Pinot Reducer code to support the intermediate stage agg 
operator.
+  public AggregateOperator(BaseOperator<TransferableBlock> inputOperator, 
List<RexExpression> aggCalls,
+      List<RexExpression> groupSet) {
+    _inputOperator = inputOperator;
+    _aggCalls = aggCalls;
+    _groupSet = groupSet;
+
+    _aggregationFunctions = new AggregationFunction[_aggCalls.size()];
+    _groupByResultHolders = new Map[_aggCalls.size()];
+    _groupByKeyHolder = new HashMap<Integer, Object[]>();
+    for (int i = 0; i < aggCalls.size(); i++) {
+      _aggregationFunctions[i] = (toAggregationFunction(aggCalls.get(i)));
+      _groupByResultHolders[i] = new HashMap<Integer, Object>();
+    }
+
+    _isCumulativeBlockConstructed = false;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    // WorkerExecutor doesn't use getChildOperators, returns null here.
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      cumulateAggregationBlocks();
+      return new TransferableBlock(toResultBlock());
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private BaseDataBlock toResultBlock()
+      throws IOException {
+    if (!_isCumulativeBlockConstructed) {
+      List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size());
+      for (Map.Entry<Integer, Object[]> e : _groupByKeyHolder.entrySet()) {
+        Object[] row = new Object[_aggregationFunctions.length + 
_groupSet.size()];
+        Object[] keyElements = e.getValue();
+        for (int i = 0; i < keyElements.length; i++) {
+          row[i] = keyElements[i];
+        }
+        for (int i = 0; i < _groupByResultHolders.length; i++) {
+          row[i + _groupSet.size()] = _groupByResultHolders[i].get(e.getKey());
+        }
+        rows.add(row);
+      }
+      _isCumulativeBlockConstructed = true;
+      if (rows.size() == 0) {
+        return DataBlockUtils.getEmptyDataBlock(_dataSchema);
+      } else {
+        return DataBlockBuilder.buildFromRows(rows, null, _dataSchema);
+      }
+    } else {
+      return DataBlockUtils.getEndOfStreamDataBlock();
+    }
+  }
+
+  private void cumulateAggregationBlocks() {
+    TransferableBlock block = _inputOperator.nextBlock();
+    while (!TransferableBlockUtils.isEndOfStream(block)) {
+      BaseDataBlock dataBlock = block.getDataBlock();
+      if (_dataSchema == null) {
+        _dataSchema = dataBlock.getDataSchema();
+      }
+      int numRows = dataBlock.getNumberOfRows();
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        Object[] row = 
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+        Key key = extraRowKey(row, _groupSet);
+        int keyHashCode = key.hashCode();
+        _groupByKeyHolder.put(keyHashCode, key.getValues());
+        for (int i = 0; i < _aggregationFunctions.length; i++) {
+          Object currentRes = _groupByResultHolders[i].get(keyHashCode);
+          if (currentRes == null) {
+            _groupByResultHolders[i].put(keyHashCode, row[i + 
_groupSet.size()]);
+          } else {
+            _groupByResultHolders[i].put(keyHashCode,
+                merge(_aggCalls.get(i), currentRes, row[i + 
_groupSet.size()]));
+          }
+        }
+      }
+      block = _inputOperator.nextBlock();
+    }
+  }
+
+  private AggregationFunction toAggregationFunction(RexExpression aggCall) {
+    Preconditions.checkState(aggCall instanceof RexExpression.FunctionCall);
+    switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) {
+      case "$SUM":
+      case "$SUM0":
+        return new SumAggregationFunction(
+            ExpressionContext.forIdentifier(
+                ((RexExpression.FunctionCall) 
aggCall).getFunctionOperands().get(0).toString()));
+      case "$COUNT":
+      case "COUNT":
+        return new CountAggregationFunction();
+      case "$MIN":
+      case "$MIN0":
+        return new MinAggregationFunction(
+            ExpressionContext.forIdentifier(
+                ((RexExpression.FunctionCall) 
aggCall).getFunctionOperands().get(0).toString()));
+      case "$MAX":
+      case "$MAX0":
+        return new MaxAggregationFunction(
+            ExpressionContext.forIdentifier(
+                ((RexExpression.FunctionCall) 
aggCall).getFunctionOperands().get(0).toString()));
+      default:
+        throw new IllegalStateException(
+            "Unexpected value: " + ((RexExpression.FunctionCall) 
aggCall).getFunctionName());
+    }
+  }
+
+  private Object merge(RexExpression aggCall, Object left, Object right) {
+    Preconditions.checkState(aggCall instanceof RexExpression.FunctionCall);
+    switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) {
+      case "$SUM":
+      case "$SUM0":
+        return (double) left + (double) right;
+      case "$COUNT":
+        return (int) left + (int) right;
+      case "$MIN":
+      case "$MIN0":
+        return Math.min((double) left, (double) right);
+      case "$MAX":
+      case "$MAX0":
+        return Math.max((double) left, (double) right);
+      default:
+        throw new IllegalStateException(
+            "Unexpected value: " + ((RexExpression.FunctionCall) 
aggCall).getFunctionName());
+    }
+  }
+
+  private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
+    Object[] keyElements = new Object[groupSet.size()];
+    for (int i = 0; i < groupSet.size(); i++) {
+      keyElements[i] = row[((RexExpression.InputRef) 
groupSet.get(i)).getIndex()];
+    }
+    return new Key(keyElements);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 86ec13adcf..15a8b0194c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.request.QuerySource;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
@@ -104,8 +105,15 @@ public class ServerRequestUtils {
       pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(
           ((FilterNode) node).getCondition(), pinotQuery));
     } else if (node instanceof ProjectNode) {
-      pinotQuery.setSelectList(CalciteRexExpressionParser.convertSelectList(
+      pinotQuery.setSelectList(CalciteRexExpressionParser.overwriteSelectList(
           ((ProjectNode) node).getProjects(), pinotQuery));
+    } else if (node instanceof AggregateNode) {
+      // set agg list
+      
pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getSelectList(),
+          ((AggregateNode) node).getAggCalls(), pinotQuery));
+      // set group-by list
+      pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(
+          ((AggregateNode) node).getGroupSet(), pinotQuery));
     } else if (node instanceof MailboxSendNode) {
       // TODO: MailboxSendNode should be the root of the leaf stage. but 
ignore for now since it is handle seperately
       // in QueryRunner as a single step sender.
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 0e60a4a533..2ef4958432 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -142,7 +142,7 @@ public class QueryServerEnclosure {
     for (int i = 0; i < NUM_ROWS; i++) {
       GenericRow row = new GenericRow();
       row.putValue("col1", STRING_FIELD_LIST[i % STRING_FIELD_LIST.length]);
-      row.putValue("col2", STRING_FIELD_LIST[(i + 2) % 
STRING_FIELD_LIST.length]);
+      row.putValue("col2", STRING_FIELD_LIST[i % (STRING_FIELD_LIST.length - 
2)]);
       row.putValue("col3", INT_FIELD_LIST[i % INT_FIELD_LIST.length]);
       row.putValue("ts", System.currentTimeMillis());
       rows.add(row);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 6ae43ce0cd..b409a4b063 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -153,11 +153,15 @@ public class QueryRunnerTest {
         // Next join with table C which has (5 on server1 and 10 on server2), 
since data is identical. each of the row
         // of the A JOIN B will have identical value of col3 as table C.col3 
has. Since the values are cycling between
         // (1, 2, 42, 1, 2). we will have 6 1s, 6 2s, and 3 42s, total result 
count will be 36 + 36 + 9 = 81
-        new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 JOIN c ON 
a.col3 = c.col3", 81},
+        new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON 
a.col3 = c.col3", 81},
 
         // Specifically table A has 15 rows (10 on server1 and 5 on server2) 
and table B has 5 rows (all on server1),
         // thus the final JOIN result will be 15 x 1 = 15.
-        new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col2", 15},
+        new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1", 15},
+
+        // Specifically table A has 15 rows (10 on server1 and 5 on server2) 
and table B has 5 rows (all on server1),
+        // thus the final JOIN result will be 15 x 1 = 15.
+        new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = 
b.col2", 15},
 
         // Specifically table A has 15 rows (10 on server1 and 5 on server2) 
and table B has 5 rows (all on server1),
         // thus the final JOIN result will be 15 x 1 = 15.
@@ -167,7 +171,19 @@ public class QueryRunnerTest {
         // but only 1 out of 5 rows from table A will be selected out; and all 
in table B will be selected.
         // thus the final JOIN result will be 1 x 3 x 1 = 3.
         new Object[]{"SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON 
a.col1 = b.col2 "
-            + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 3},
+            + " WHERE a.col3 >= 0 AND a.col2 = 'alice' AND b.col3 >= 0", 3},
+
+        // Projection pushdown
+        new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 
AND a.col2 = 'alice'", 3},
+
+        // Aggregation with group by
+        new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 
GROUP BY a.col1", 5},
+
+        // Aggregation with multiple group key
+        new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 
>= 0 GROUP BY a.col1, a.col2", 5},
+
+        // Aggregation without group by
+        new Object[]{"SELECT COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 
'alice'", 1},
     };
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to