This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ad32a2a5ec adding in AggregateNode and related contents. (#8946) ad32a2a5ec is described below commit ad32a2a5ec113165217066ff9dd2a8b5d297ead1 Author: Rong Rong <walterddr.walter...@gmail.com> AuthorDate: Thu Jun 30 12:44:35 2022 -0700 adding in AggregateNode and related contents. (#8946) - adding agg split rules - adding agg split rules for leaf-intermediate split - adding agg operator conversion from input groups to intermediate group - support agg after JOIN as well - adding in agg without group by as well - fix add vs. replace selectList - also support multi-column group by key - misc - support multi-column JOIN - also fix hash distribution rule - validate that transform actually works with group-by Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/common/utils/request/RequestUtils.java | 2 +- .../tests/MultiStageEngineIntegrationTest.java | 7 +- .../query/parser/CalciteRexExpressionParser.java | 34 +++- .../query/planner/hints/PinotRelationalHints.java | 2 + .../query/planner/logical/RelToStageConverter.java | 8 + .../pinot/query/planner/logical/RexExpression.java | 17 +- .../pinot/query/planner/logical/StagePlanner.java | 3 +- .../partitioning/FieldSelectionKeySelector.java | 2 +- .../pinot/query/planner/stage/AggregateNode.java | 58 ++++++ .../query/planner/stage/StageNodeSerDeUtils.java | 2 + .../PinotAggregateExchangeNodeInsertRule.java | 181 ++++++++++++++++++ ...e.java => PinotJoinExchangeNodeInsertRule.java} | 10 +- .../pinot/query/rules/PinotQueryRuleSets.java | 3 +- .../pinot/query/QueryEnvironmentTestBase.java | 10 +- .../runtime/executor/WorkerQueryExecutor.java | 7 + .../query/runtime/operator/AggregateOperator.java | 211 +++++++++++++++++++++ .../query/runtime/utils/ServerRequestUtils.java | 10 +- .../apache/pinot/query/QueryServerEnclosure.java | 2 +- .../pinot/query/runtime/QueryRunnerTest.java | 22 ++- 19 files changed, 565 insertions(+), 26 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java index 32afe620fc..4f0f7bcca8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java @@ -114,7 +114,7 @@ public class RequestUtils { } public static Expression getFunctionExpression(String canonicalName) { - assert canonicalName.equals(canonicalizeFunctionNamePreservingSpecialKey(canonicalName)); + assert canonicalName.equalsIgnoreCase(canonicalizeFunctionNamePreservingSpecialKey(canonicalName)); Expression expression = new Expression(ExpressionType.FUNCTION); Function function = new Function(canonicalName); expression.setFunctionCall(function); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 5d149de4ed..0d78d2b182 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -113,10 +113,11 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest @DataProvider public Object[][] multiStageQueryEngineSqlTestSet() { return new Object[][] { - new Object[]{"SELECT * FROM mytable_OFFLINE", 10, 73}, + new Object[]{"SELECT COUNT(*) FROM mytable_OFFLINE WHERE Carrier='AA'", 1, 1}, + new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>1000", 2, 73}, new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE WHERE CarrierDelay=15 AND ArrDelay>20", 10, 2}, - new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.AirlineID = b.AirlineID " - + " WHERE a.CarrierDelay=15 AND a.ArrDelay>20 AND b.ArrDelay<20", 10, 146} + new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.Origin = b.Origin " + + " WHERE a.Carrier='AA' AND a.ArrDelay>1000 AND b.ArrDelay>1000", 2, 146} }; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java index a4f5753557..70ee68b21c 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java @@ -53,8 +53,13 @@ public class CalciteRexExpressionParser { // Relational conversion Utils // -------------------------------------------------------------------------- - public static List<Expression> convertSelectList(List<RexExpression> rexNodeList, PinotQuery pinotQuery) { - List<Expression> selectExpr = new ArrayList<>(); + public static List<Expression> overwriteSelectList(List<RexExpression> rexNodeList, PinotQuery pinotQuery) { + return addSelectList(new ArrayList<>(), rexNodeList, pinotQuery); + } + + public static List<Expression> addSelectList(List<Expression> existingList, List<RexExpression> rexNodeList, + PinotQuery pinotQuery) { + List<Expression> selectExpr = new ArrayList<>(existingList); final Iterator<RexExpression> iterator = rexNodeList.iterator(); while (iterator.hasNext()) { @@ -65,6 +70,18 @@ public class CalciteRexExpressionParser { return selectExpr; } + public static List<Expression> convertGroupByList(List<RexExpression> rexNodeList, PinotQuery pinotQuery) { + List<Expression> groupByExpr = new ArrayList<>(); + + final Iterator<RexExpression> iterator = rexNodeList.iterator(); + while (iterator.hasNext()) { + final RexExpression next = iterator.next(); + groupByExpr.add(toExpression(next, pinotQuery)); + } + + return groupByExpr; + } + private static List<Expression> convertDistinctSelectList(RexExpression.FunctionCall rexCall, PinotQuery pinotQuery) { List<Expression> selectExpr = new ArrayList<>(); selectExpr.add(convertDistinctAndSelectListToFunctionExpression(rexCall, pinotQuery)); @@ -169,7 +186,7 @@ public class CalciteRexExpressionParser { operands.add(toExpression(childNode, pinotQuery)); } ParserUtils.validateFunction(functionName, operands); - Expression functionExpression = RequestUtils.getFunctionExpression(functionName); + Expression functionExpression = RequestUtils.getFunctionExpression(canonicalizeFunctionName(functionName)); functionExpression.getFunctionCall().setOperands(operands); return functionExpression; } @@ -209,4 +226,15 @@ public class CalciteRexExpressionParser { andExpression.getFunctionCall().setOperands(operands); return andExpression; } + + /** + * Canonicalize Calcite generated Logical function names. + */ + private static String canonicalizeFunctionName(String functionName) { + if (functionName.endsWith("0")) { + return functionName.substring(0, functionName.length() - 1); + } else { + return functionName; + } + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java index 19a9daa54f..2c4cb976a6 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java @@ -27,6 +27,8 @@ import org.apache.calcite.rel.hint.RelHint; public class PinotRelationalHints { public static final RelHint USE_HASH_DISTRIBUTE = RelHint.builder("USE_HASH_DISTRIBUTE").build(); public static final RelHint USE_BROADCAST_DISTRIBUTE = RelHint.builder("USE_BROADCAST_DISTRIBUTE").build(); + public static final RelHint AGG_INTERMEDIATE_STAGE = RelHint.builder("AGG_INTERMEDIATE_STAGE").build(); + public static final RelHint AGG_LEAF_STAGE = RelHint.builder("AGG_LEAF_STAGE").build(); private PinotRelationalHints() { // do not instantiate. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java index bc6d7dc4ca..23f8fb6db8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; @@ -32,6 +33,7 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; import org.apache.pinot.query.planner.PlannerUtils; import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; +import org.apache.pinot.query.planner.stage.AggregateNode; import org.apache.pinot.query.planner.stage.FilterNode; import org.apache.pinot.query.planner.stage.JoinNode; import org.apache.pinot.query.planner.stage.ProjectNode; @@ -65,11 +67,17 @@ public final class RelToStageConverter { return convertLogicalProject((LogicalProject) node, currentStageId); } else if (node instanceof LogicalFilter) { return convertLogicalFilter((LogicalFilter) node, currentStageId); + } else if (node instanceof LogicalAggregate) { + return convertLogicalAggregate((LogicalAggregate) node, currentStageId); } else { throw new UnsupportedOperationException("Unsupported logical plan node: " + node); } } + private static StageNode convertLogicalAggregate(LogicalAggregate node, int currentStageId) { + return new AggregateNode(currentStageId, node.getAggCallList(), node.getGroupSet()); + } + private static StageNode convertLogicalProject(LogicalProject node, int currentStageId) { return new ProjectNode(currentStageId, node.getProjects()); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java index 17e472c811..778907e40a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java @@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.logical; import java.math.BigDecimal; import java.util.List; import java.util.stream.Collectors; +import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; @@ -62,18 +63,24 @@ public interface RexExpression { } } + static RexExpression toRexExpression(AggregateCall aggCall) { + List<RexExpression> operands = aggCall.getArgList().stream().map(InputRef::new).collect(Collectors.toList()); + return new RexExpression.FunctionCall(aggCall.getAggregation().getKind(), toDataType(aggCall.getType()), + aggCall.getAggregation().getName(), operands); + } + static Object toRexValue(FieldSpec.DataType dataType, Comparable value) { switch (dataType) { case INT: - return ((BigDecimal) value).intValue(); + return value == null ? 0 : ((BigDecimal) value).intValue(); case LONG: - return ((BigDecimal) value).longValue(); + return value == null ? 0L : ((BigDecimal) value).longValue(); case FLOAT: - return ((BigDecimal) value).floatValue(); + return value == null ? 0f : ((BigDecimal) value).floatValue(); case DOUBLE: - return ((BigDecimal) value).doubleValue(); + return value == null ? 0d : ((BigDecimal) value).doubleValue(); case STRING: - return ((NlsString) value).getValue(); + return value == null ? "" : ((NlsString) value).getValue(); default: return value; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java index db7fd63d83..8d9bbbc51d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java @@ -99,13 +99,14 @@ public class StagePlanner { // 1. exchangeNode always have only one input, get its input converted as a new stage root. StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId()); RelDistribution distribution = ((LogicalExchange) node).getDistribution(); + List<Integer> distributionKeys = distribution.getKeys(); RelDistribution.Type exchangeType = distribution.getType(); // 2. make an exchange sender and receiver node pair StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType); StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED - ? new FieldSelectionKeySelector(distribution.getKeys().get(0)) : null); + ? new FieldSelectionKeySelector(distributionKeys) : null); mailboxSender.addInput(nextStageRoot); // 3. put the sender side as a completed stage. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java index 674cc8e2a2..fd04ca589e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java @@ -71,6 +71,6 @@ public class FieldSelectionKeySelector implements KeySelector<Object[], Object[] for (int columnIndex : _columnIndices) { hashCodeBuilder.append(input[columnIndex]); } - return hashCodeBuilder.toHashCode(); + return Math.abs(hashCodeBuilder.toHashCode()); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java new file mode 100644 index 0000000000..ae41d14a79 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.stage; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.query.planner.serde.ProtoProperties; + + +public class AggregateNode extends AbstractStageNode { + @ProtoProperties + private List<RexExpression> _aggCalls; + @ProtoProperties + private List<RexExpression> _groupSet; + + public AggregateNode(int stageId) { + super(stageId); + } + + public AggregateNode(int stageId, List<AggregateCall> aggCalls, ImmutableBitSet groupSet) { + super(stageId); + _aggCalls = aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList()); + _groupSet = new ArrayList<>(groupSet.cardinality()); + Iterator<Integer> groupSetIt = groupSet.iterator(); + while (groupSetIt.hasNext()) { + _groupSet.add(new RexExpression.InputRef(groupSetIt.next())); + } + } + + public List<RexExpression> getAggCalls() { + return _aggCalls; + } + + public List<RexExpression> getGroupSet() { + return _groupSet; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java index 3d34f6effb..8d341a207c 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java @@ -56,6 +56,8 @@ public final class StageNodeSerDeUtils { return new ProjectNode(stageId); case "FilterNode": return new FilterNode(stageId); + case "AggregateNode": + return new AggregateNode(stageId); case "MailboxSendNode": return new MailboxSendNode(stageId); case "MailboxReceiveNode": diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java new file mode 100644 index 0000000000..1150bc2085 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.rules; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.pinot.query.planner.hints.PinotRelationalHints; + + +/** + * Special rule for Pinot, this rule is fixed to generate a 2-stage aggregation split between the + * (1) non-data-locale Pinot server agg stage, and (2) the data-locale Pinot intermediate agg stage. + * + * Pinot uses special intermediate data representation for partially aggregated results, thus we can't use + * {@link org.apache.calcite.rel.rules.AggregateReduceFunctionsRule} to reduce complex aggregation. + * + * This rule is here to introduces Pinot-special aggregation splits. In-general, all aggregations are split into + * intermediate-stage AGG; and server-stage AGG with the same naming. E.g. + * + * COUNT(*) transforms into: COUNT(*)_SERVER --> COUNT(*)_INTERMEDIATE, where + * COUNT(*)_SERVER produces TUPLE[ SUM(1), GROUP_BY_KEY ] + * COUNT(*)_INTERMEDIATE produces TUPLE[ SUM(COUNT(*)_SERVER), GROUP_BY_KEY ] + * + * However, the suffix _SERVER/_INTERMEDIATE is merely a SQL hint to the Aggregate operator and will be translated + * into correct, actual operator chain during Physical plan. + */ +public class PinotAggregateExchangeNodeInsertRule extends RelOptRule { + public static final PinotAggregateExchangeNodeInsertRule INSTANCE = + new PinotAggregateExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER); + + public PinotAggregateExchangeNodeInsertRule(RelBuilderFactory factory) { + super(operand(LogicalAggregate.class, any()), factory, null); + } + + @Override + public boolean matches(RelOptRuleCall call) { + if (call.rels.length < 1) { + return false; + } + if (call.rel(0) instanceof Aggregate) { + Aggregate agg = call.rel(0); + return !agg.getHints().contains(PinotRelationalHints.AGG_LEAF_STAGE) + && !agg.getHints().contains(PinotRelationalHints.AGG_INTERMEDIATE_STAGE); + } + return false; + } + + /** + * Split the AGG into 2 stages, both with the same AGG type, + * Pinot internal stage optimization can use the info of the input data type to infer whether it should generate + * the "intermediate-stage AGG operator" or a "leaf-stage AGG operator" + * @see org.apache.pinot.core.query.aggregation.function.AggregationFunction + * + * @param call the {@link RelOptRuleCall} on match. + */ + @Override + public void onMatch(RelOptRuleCall call) { + Aggregate oldAggRel = call.rel(0); + ImmutableList<RelHint> orgHints = oldAggRel.getHints(); + + // 1. attach leaf agg RelHint to original agg. + ImmutableList<RelHint> newLeafAggHints = + new ImmutableList.Builder<RelHint>().addAll(orgHints).add(PinotRelationalHints.AGG_LEAF_STAGE).build(); + Aggregate newLeafAgg = + new LogicalAggregate(oldAggRel.getCluster(), oldAggRel.getTraitSet(), newLeafAggHints, oldAggRel.getInput(), + oldAggRel.getGroupSet(), oldAggRel.getGroupSets(), oldAggRel.getAggCallList()); + + // 2. attach exchange. + List<Integer> groupSetIndices = ImmutableIntList.range(0, oldAggRel.getGroupCount()); + LogicalExchange exchange = null; + if (groupSetIndices.size() == 0) { + exchange = LogicalExchange.create(newLeafAgg, RelDistributions.SINGLETON); + } else { + exchange = LogicalExchange.create(newLeafAgg, RelDistributions.hash(groupSetIndices)); + } + + // 3. attach intermediate agg stage. + RelNode newAggNode = makeNewIntermediateAgg(call, oldAggRel, exchange); + call.transformTo(newAggNode); + } + + private RelNode makeNewIntermediateAgg(RelOptRuleCall ruleCall, Aggregate oldAggRel, LogicalExchange exchange) { + + // add the exchange as the input node to the relation builder. + RelBuilder relBuilder = ruleCall.builder(); + relBuilder.push(exchange); + List<RexNode> inputExprs = new ArrayList<>(relBuilder.fields()); + + // make input ref to the exchange after the leaf aggregate. + RexBuilder rexBuilder = exchange.getCluster().getRexBuilder(); + final int nGroups = oldAggRel.getGroupCount(); + for (int i = 0; i < nGroups; i++) { + rexBuilder.makeInputRef(oldAggRel, i); + } + + // create new aggregate function calls from exchange input. + List<AggregateCall> oldCalls = oldAggRel.getAggCallList(); + List<AggregateCall> newCalls = new ArrayList<>(); + Map<AggregateCall, RexNode> aggCallMapping = new HashMap<>(); + + for (int oldCallIndex = 0; oldCallIndex < oldCalls.size(); oldCallIndex++) { + AggregateCall oldCall = oldCalls.get(oldCallIndex); + convertAggCall(rexBuilder, oldAggRel, oldCallIndex, oldCall, newCalls, aggCallMapping, inputExprs); + } + + // create new aggregate relation. + ImmutableList<RelHint> orgHints = oldAggRel.getHints(); + ImmutableList<RelHint> newIntermediateAggHints = + new ImmutableList.Builder<RelHint>().addAll(orgHints).add(PinotRelationalHints.AGG_INTERMEDIATE_STAGE).build(); + ImmutableBitSet groupSet = ImmutableBitSet.range(nGroups); + relBuilder.aggregate( + relBuilder.groupKey(groupSet, ImmutableList.of(groupSet)), + newCalls); + relBuilder.hints(newIntermediateAggHints); + return relBuilder.build(); + } + + /** + * convert aggregate call based on the intermediate stage input. + * + * <p>Note that the intermediate stage input only supports splittable aggregators such as SUM/MIN/MAX. + * All non-splittable aggregator must be converted into splittable aggregator first. + */ + private static void convertAggCall(RexBuilder rexBuilder, Aggregate oldAggRel, int oldCallIndex, + AggregateCall oldCall, List<AggregateCall> newCalls, Map<AggregateCall, RexNode> aggCallMapping, + List<RexNode> inputExprs) { + final int nGroups = oldAggRel.getGroupCount(); + AggregateCall newCall = AggregateCall.create( + oldCall.getAggregation(), oldCall.isDistinct(), oldCall.isApproximate(), oldCall.ignoreNulls(), + convertArgList(nGroups + oldCallIndex, oldCall.getArgList()), oldCall.filterArg, oldCall.distinctKeys, + oldCall.collation, oldCall.type, oldCall.getName()); + rexBuilder.addAggCall(newCall, + nGroups, + newCalls, + aggCallMapping, + oldAggRel.getInput()::fieldIsNullable); + } + + private static List<Integer> convertArgList(int oldCallIndexWithShift, List<Integer> argList) { + Preconditions.checkArgument(argList.size() <= 1, + "Unable to convert call as the argList contains more than 1 argument"); + return argList.size() == 1 ? Collections.singletonList(oldCallIndexWithShift) : Collections.emptyList(); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java similarity index 91% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java index 2a1b740669..6aaacccac1 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java @@ -38,13 +38,13 @@ import org.apache.pinot.query.planner.hints.PinotRelationalHints; /** - * Special rule for Pinot, always insert exchange after JOIN + * Special rule for Pinot, this rule is fixed to always insert exchange after JOIN node. */ -public class PinotExchangeNodeInsertRule extends RelOptRule { - public static final PinotExchangeNodeInsertRule INSTANCE = - new PinotExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER); +public class PinotJoinExchangeNodeInsertRule extends RelOptRule { + public static final PinotJoinExchangeNodeInsertRule INSTANCE = + new PinotJoinExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER); - public PinotExchangeNodeInsertRule(RelBuilderFactory factory) { + public PinotJoinExchangeNodeInsertRule(RelBuilderFactory factory) { super(operand(LogicalJoin.class, any()), factory, null); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java index 63c2fd799f..1f3759b7b4 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java @@ -89,5 +89,6 @@ public class PinotQueryRuleSets { PruneEmptyRules.UNION_INSTANCE, // Pinot specific rules - PinotExchangeNodeInsertRule.INSTANCE); + PinotJoinExchangeNodeInsertRule.INSTANCE, + PinotAggregateExchangeNodeInsertRule.INSTANCE); } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java index 176bf3edc8..1c6ad7ff29 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java @@ -47,7 +47,15 @@ public class QueryEnvironmentTestBase { new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0"}, new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2"}, new Object[]{"SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 " - + "WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"}, + + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"}, + new Object[]{"SELECT a.col1, a.col3 + a.ts FROM a WHERE a.col3 >= 0 AND a.col2 = 'a'"}, + new Object[]{"SELECT SUM(a.col3), COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a'"}, + new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1"}, + new Object[]{"SELECT a.col1, AVG(a.col3) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1"}, + new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col1 = 'a' " + + " GROUP BY a.col1, a.col2"}, + new Object[]{"SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 " + + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1"}, }; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java index 01a97643ab..379d7bdbd6 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java @@ -30,6 +30,7 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.util.trace.TraceRunnable; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.query.planner.stage.AggregateNode; import org.apache.pinot.query.planner.stage.FilterNode; import org.apache.pinot.query.planner.stage.JoinNode; import org.apache.pinot.query.planner.stage.MailboxReceiveNode; @@ -38,6 +39,7 @@ import org.apache.pinot.query.planner.stage.ProjectNode; import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.operator.AggregateOperator; import org.apache.pinot.query.runtime.operator.HashJoinOperator; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; @@ -117,6 +119,11 @@ public class WorkerQueryExecutor { BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap); BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap); return new HashJoinOperator(leftOperator, rightOperator, joinNode.getCriteria()); + } else if (stageNode instanceof AggregateNode) { + AggregateNode aggregateNode = (AggregateNode) stageNode; + BaseOperator<TransferableBlock> inputOperator = + getOperator(requestId, aggregateNode.getInputs().get(0), metadataMap); + return new AggregateOperator(inputOperator, aggregateNode.getAggCalls(), aggregateNode.getGroupSet()); } else if (stageNode instanceof FilterNode) { throw new UnsupportedOperationException("Unsupported!"); } else if (stageNode instanceof ProjectNode) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java new file mode 100644 index 0000000000..95a5dcd03f --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.common.datablock.BaseDataBlock; +import org.apache.pinot.core.common.datablock.DataBlockBuilder; +import org.apache.pinot.core.common.datablock.DataBlockUtils; +import org.apache.pinot.core.data.table.Key; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.CountAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.MaxAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.MinAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.SumAggregationFunction; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; + + +/** + * + */ +public class AggregateOperator extends BaseOperator<TransferableBlock> { + private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR"; + + private BaseOperator<TransferableBlock> _inputOperator; + private List<RexExpression> _aggCalls; + private List<RexExpression> _groupSet; + + private final AggregationFunction[] _aggregationFunctions; + private final Map<Integer, Object>[] _groupByResultHolders; + private final Map<Integer, Object[]> _groupByKeyHolder; + + private DataSchema _dataSchema; + private boolean _isCumulativeBlockConstructed; + + // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator. + public AggregateOperator(BaseOperator<TransferableBlock> inputOperator, List<RexExpression> aggCalls, + List<RexExpression> groupSet) { + _inputOperator = inputOperator; + _aggCalls = aggCalls; + _groupSet = groupSet; + + _aggregationFunctions = new AggregationFunction[_aggCalls.size()]; + _groupByResultHolders = new Map[_aggCalls.size()]; + _groupByKeyHolder = new HashMap<Integer, Object[]>(); + for (int i = 0; i < aggCalls.size(); i++) { + _aggregationFunctions[i] = (toAggregationFunction(aggCalls.get(i))); + _groupByResultHolders[i] = new HashMap<Integer, Object>(); + } + + _isCumulativeBlockConstructed = false; + } + + @Override + public List<Operator> getChildOperators() { + // WorkerExecutor doesn't use getChildOperators, returns null here. + return null; + } + + @Nullable + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + protected TransferableBlock getNextBlock() { + try { + cumulateAggregationBlocks(); + return new TransferableBlock(toResultBlock()); + } catch (Exception e) { + return TransferableBlockUtils.getErrorTransferableBlock(e); + } + } + + private BaseDataBlock toResultBlock() + throws IOException { + if (!_isCumulativeBlockConstructed) { + List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size()); + for (Map.Entry<Integer, Object[]> e : _groupByKeyHolder.entrySet()) { + Object[] row = new Object[_aggregationFunctions.length + _groupSet.size()]; + Object[] keyElements = e.getValue(); + for (int i = 0; i < keyElements.length; i++) { + row[i] = keyElements[i]; + } + for (int i = 0; i < _groupByResultHolders.length; i++) { + row[i + _groupSet.size()] = _groupByResultHolders[i].get(e.getKey()); + } + rows.add(row); + } + _isCumulativeBlockConstructed = true; + if (rows.size() == 0) { + return DataBlockUtils.getEmptyDataBlock(_dataSchema); + } else { + return DataBlockBuilder.buildFromRows(rows, null, _dataSchema); + } + } else { + return DataBlockUtils.getEndOfStreamDataBlock(); + } + } + + private void cumulateAggregationBlocks() { + TransferableBlock block = _inputOperator.nextBlock(); + while (!TransferableBlockUtils.isEndOfStream(block)) { + BaseDataBlock dataBlock = block.getDataBlock(); + if (_dataSchema == null) { + _dataSchema = dataBlock.getDataSchema(); + } + int numRows = dataBlock.getNumberOfRows(); + for (int rowId = 0; rowId < numRows; rowId++) { + Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId); + Key key = extraRowKey(row, _groupSet); + int keyHashCode = key.hashCode(); + _groupByKeyHolder.put(keyHashCode, key.getValues()); + for (int i = 0; i < _aggregationFunctions.length; i++) { + Object currentRes = _groupByResultHolders[i].get(keyHashCode); + if (currentRes == null) { + _groupByResultHolders[i].put(keyHashCode, row[i + _groupSet.size()]); + } else { + _groupByResultHolders[i].put(keyHashCode, + merge(_aggCalls.get(i), currentRes, row[i + _groupSet.size()])); + } + } + } + block = _inputOperator.nextBlock(); + } + } + + private AggregationFunction toAggregationFunction(RexExpression aggCall) { + Preconditions.checkState(aggCall instanceof RexExpression.FunctionCall); + switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) { + case "$SUM": + case "$SUM0": + return new SumAggregationFunction( + ExpressionContext.forIdentifier( + ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString())); + case "$COUNT": + case "COUNT": + return new CountAggregationFunction(); + case "$MIN": + case "$MIN0": + return new MinAggregationFunction( + ExpressionContext.forIdentifier( + ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString())); + case "$MAX": + case "$MAX0": + return new MaxAggregationFunction( + ExpressionContext.forIdentifier( + ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString())); + default: + throw new IllegalStateException( + "Unexpected value: " + ((RexExpression.FunctionCall) aggCall).getFunctionName()); + } + } + + private Object merge(RexExpression aggCall, Object left, Object right) { + Preconditions.checkState(aggCall instanceof RexExpression.FunctionCall); + switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) { + case "$SUM": + case "$SUM0": + return (double) left + (double) right; + case "$COUNT": + return (int) left + (int) right; + case "$MIN": + case "$MIN0": + return Math.min((double) left, (double) right); + case "$MAX": + case "$MAX0": + return Math.max((double) left, (double) right); + default: + throw new IllegalStateException( + "Unexpected value: " + ((RexExpression.FunctionCall) aggCall).getFunctionName()); + } + } + + private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) { + Object[] keyElements = new Object[groupSet.size()]; + for (int i = 0; i < groupSet.size(); i++) { + keyElements[i] = row[((RexExpression.InputRef) groupSet.get(i)).getIndex()]; + } + return new Key(keyElements); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java index 86ec13adcf..15a8b0194c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java @@ -29,6 +29,7 @@ import org.apache.pinot.common.request.QuerySource; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.query.parser.CalciteRexExpressionParser; +import org.apache.pinot.query.planner.stage.AggregateNode; import org.apache.pinot.query.planner.stage.FilterNode; import org.apache.pinot.query.planner.stage.MailboxSendNode; import org.apache.pinot.query.planner.stage.ProjectNode; @@ -104,8 +105,15 @@ public class ServerRequestUtils { pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression( ((FilterNode) node).getCondition(), pinotQuery)); } else if (node instanceof ProjectNode) { - pinotQuery.setSelectList(CalciteRexExpressionParser.convertSelectList( + pinotQuery.setSelectList(CalciteRexExpressionParser.overwriteSelectList( ((ProjectNode) node).getProjects(), pinotQuery)); + } else if (node instanceof AggregateNode) { + // set agg list + pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getSelectList(), + ((AggregateNode) node).getAggCalls(), pinotQuery)); + // set group-by list + pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList( + ((AggregateNode) node).getGroupSet(), pinotQuery)); } else if (node instanceof MailboxSendNode) { // TODO: MailboxSendNode should be the root of the leaf stage. but ignore for now since it is handle seperately // in QueryRunner as a single step sender. diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java index 0e60a4a533..2ef4958432 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java @@ -142,7 +142,7 @@ public class QueryServerEnclosure { for (int i = 0; i < NUM_ROWS; i++) { GenericRow row = new GenericRow(); row.putValue("col1", STRING_FIELD_LIST[i % STRING_FIELD_LIST.length]); - row.putValue("col2", STRING_FIELD_LIST[(i + 2) % STRING_FIELD_LIST.length]); + row.putValue("col2", STRING_FIELD_LIST[i % (STRING_FIELD_LIST.length - 2)]); row.putValue("col3", INT_FIELD_LIST[i % INT_FIELD_LIST.length]); row.putValue("ts", System.currentTimeMillis()); rows.add(row); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index 6ae43ce0cd..b409a4b063 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -153,11 +153,15 @@ public class QueryRunnerTest { // Next join with table C which has (5 on server1 and 10 on server2), since data is identical. each of the row // of the A JOIN B will have identical value of col3 as table C.col3 has. Since the values are cycling between // (1, 2, 42, 1, 2). we will have 6 1s, 6 2s, and 3 42s, total result count will be 36 + 36 + 9 = 81 - new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 JOIN c ON a.col3 = c.col3", 81}, + new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col3 = c.col3", 81}, // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), // thus the final JOIN result will be 15 x 1 = 15. - new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col2", 15}, + new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1", 15}, + + // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), + // thus the final JOIN result will be 15 x 1 = 15. + new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2", 15}, // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), // thus the final JOIN result will be 15 x 1 = 15. @@ -167,7 +171,19 @@ public class QueryRunnerTest { // but only 1 out of 5 rows from table A will be selected out; and all in table B will be selected. // thus the final JOIN result will be 1 x 3 x 1 = 3. new Object[]{"SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON a.col1 = b.col2 " - + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 3}, + + " WHERE a.col3 >= 0 AND a.col2 = 'alice' AND b.col3 >= 0", 3}, + + // Projection pushdown + new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'", 3}, + + // Aggregation with group by + new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1", 5}, + + // Aggregation with multiple group key + new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1, a.col2", 5}, + + // Aggregation without group by + new Object[]{"SELECT COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'", 1}, }; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org