This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch multi_stage_query_engine in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 3b01b2dec835267abb7db59559270e427e8b3b7c Author: Rong Rong <ro...@apache.org> AuthorDate: Fri Apr 22 08:57:29 2022 -0700 add support for project/filter pushdown (#8558) * make planner node ready for project/filter pushdown making filter expression compilation work, generating operator add ProjectNode compilation as well * fixing comments from previous PR - add float type - change serde javadoc - change function name for StageNode serializable - change function name in proto utils * add distributed hash join capability * fixing serde * refactor calcite components into sql/rex/expression 3 subclasses * address diff comments 1. renamed BroadcastJoin to HashJoin 2. relocate packages inside planner to planner.node to planner.logical/serde/stage based on their functionalities 3. added SqlHint strategy to planner so that it can do either hash or broadcast join * additional comment address 1. remove CalciteExpressionParser as it is not used 2. remove rowType in construction for AbstractStagerNode * rename RelHint Co-authored-by: Rong Rong <ro...@startree.ai> --- pinot-common/src/main/proto/plan.proto | 5 +- pinot-query-planner/pom.xml | 4 +- .../org/apache/pinot/query/QueryEnvironment.java | 12 +- .../query/parser/CalciteExpressionParser.java | 502 --------------------- .../query/parser/CalciteRexExpressionParser.java | 217 +++++++++ .../pinot/query/parser/CalciteSqlParser.java | 2 +- .../org/apache/pinot/query/parser/ParserUtils.java | 55 +++ .../org/apache/pinot/query/planner/QueryPlan.java | 3 +- .../apache/pinot/query/planner/StageMetadata.java | 4 +- .../PinotRelationalHints.java} | 24 +- .../planner/{ => logical}/LogicalPlanner.java | 2 +- .../planner/{ => logical}/RelToStageConverter.java | 39 +- .../pinot/query/planner/logical/RexExpression.java | 157 +++++++ .../query/planner/{ => logical}/StagePlanner.java | 31 +- .../partitioning/FieldSelectionKeySelector.java | 3 + .../query/planner/partitioning/KeySelector.java | 4 +- .../ProtoProperties.java} | 19 +- .../{nodes => }/serde/ProtoSerializable.java | 20 +- .../{nodes => }/serde/ProtoSerializationUtils.java | 117 +++-- .../{nodes => stage}/AbstractStageNode.java | 24 +- .../{nodes/CalcNode.java => stage/FilterNode.java} | 24 +- .../query/planner/{nodes => stage}/JoinNode.java | 12 +- .../{nodes => stage}/MailboxReceiveNode.java | 9 +- .../planner/{nodes => stage}/MailboxSendNode.java | 24 +- .../TableScanNode.java => stage/ProjectNode.java} | 32 +- .../query/planner/{nodes => stage}/StageNode.java | 2 +- .../StageNodeSerDeUtils.java} | 16 +- .../planner/{nodes => stage}/TableScanNode.java | 9 +- .../query/rules/PinotExchangeNodeInsertRule.java | 24 +- .../pinot/query/rules/PinotQueryRuleSets.java | 2 + .../apache/pinot/query/QueryEnvironmentTest.java | 80 ++-- .../pinot/query/QueryEnvironmentTestBase.java | 52 +++ .../pinot/query/planner/stage/SerDeUtilsTest.java | 81 ++++ .../apache/pinot/query/runtime/QueryRunner.java | 6 +- .../runtime/executor/WorkerQueryExecutor.java | 33 +- ...castJoinOperator.java => HashJoinOperator.java} | 6 +- .../runtime/operator/MailboxSendOperator.java | 52 ++- .../query/runtime/plan/DistributedStagePlan.java | 2 +- .../runtime/plan/serde/QueryPlanSerDeUtils.java | 8 +- .../query/runtime/utils/ServerRequestUtils.java | 33 +- .../pinot/query/service/QueryDispatcher.java | 2 +- .../pinot/query/runtime/QueryRunnerTest.java | 170 ++----- .../pinot/query/service/QueryServerTest.java | 40 +- 43 files changed, 1071 insertions(+), 892 deletions(-) diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto index 47018197fc..8e75a31a42 100644 --- a/pinot-common/src/main/proto/plan.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -76,8 +76,9 @@ message LiteralField { bool boolField = 1; int32 intField = 2; int64 longField = 3; - double doubleField = 4; - string stringField = 5; + float floatField = 4; + double doubleField = 5; + string stringField = 6; } } diff --git a/pinot-query-planner/pom.xml b/pinot-query-planner/pom.xml index 05b9461cdc..68ee3a2057 100644 --- a/pinot-query-planner/pom.xml +++ b/pinot-query-planner/pom.xml @@ -70,12 +70,12 @@ <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> - <version>3.0.9</version> + <version>3.1.6</version> </dependency> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> - <version>3.0.9</version> + <version>3.1.6</version> </dependency> <dependency> diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 215c0051d7..3a19156287 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -31,6 +31,7 @@ import org.apache.calcite.prepare.PlannerImpl; import org.apache.calcite.prepare.Prepare; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.hint.HintStrategyTable; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.sql.SqlKind; @@ -43,9 +44,9 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.pinot.query.context.PlannerContext; import org.apache.pinot.query.parser.CalciteSqlParser; -import org.apache.pinot.query.planner.LogicalPlanner; import org.apache.pinot.query.planner.QueryPlan; -import org.apache.pinot.query.planner.StagePlanner; +import org.apache.pinot.query.planner.logical.LogicalPlanner; +import org.apache.pinot.query.planner.logical.StagePlanner; import org.apache.pinot.query.routing.WorkerManager; import org.apache.pinot.query.rules.PinotQueryRuleSets; import org.apache.pinot.query.type.TypeFactory; @@ -157,10 +158,15 @@ public class QueryEnvironment { RelOptCluster cluster = RelOptCluster.create(_relOptPlanner, rexBuilder); SqlToRelConverter sqlToRelConverter = new SqlToRelConverter(_planner, _validator, _catalogReader, cluster, StandardConvertletTable.INSTANCE, - SqlToRelConverter.config()); + SqlToRelConverter.config().withHintStrategyTable(getHintStrategyTable(plannerContext))); return sqlToRelConverter.convertQuery(parsed, false, true); } + // TODO: add hint strategy table based on plannerContext. + private HintStrategyTable getHintStrategyTable(PlannerContext plannerContext) { + return HintStrategyTable.builder().build(); + } + protected RelNode optimize(RelRoot relRoot, PlannerContext plannerContext) { // 4. optimize relNode // TODO: add support for traits, cost factory. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java deleted file mode 100644 index fc75efb1ae..0000000000 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java +++ /dev/null @@ -1,502 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.query.parser; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import org.apache.calcite.sql.SqlBasicCall; -import org.apache.calcite.sql.SqlDataTypeSpec; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlLiteral; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.fun.SqlCase; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.commons.lang3.StringUtils; -import org.apache.pinot.common.request.Expression; -import org.apache.pinot.common.request.ExpressionType; -import org.apache.pinot.common.request.Function; -import org.apache.pinot.common.utils.request.RequestUtils; -import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.apache.pinot.sql.parsers.SqlCompilationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Calcite parser to convert SQL expressions into {@link Expression}. - * - * <p>This class is extracted from {@link org.apache.pinot.sql.parsers.CalciteSqlParser}. It only contains the - * {@link Expression} related info, this is used for ingestion and query rewrite. - */ -public class CalciteExpressionParser { - private static final Logger LOGGER = LoggerFactory.getLogger(CalciteExpressionParser.class); - - private CalciteExpressionParser() { - // do not instantiate. - } - - private static List<Expression> getAliasLeftExpressionsFromDistinctExpression(Function function) { - List<Expression> operands = function.getOperands(); - List<Expression> expressions = new ArrayList<>(operands.size()); - for (Expression operand : operands) { - if (isAsFunction(operand)) { - expressions.add(operand.getFunctionCall().getOperands().get(0)); - } else { - expressions.add(operand); - } - } - return expressions; - } - - public static boolean isAggregateExpression(Expression expression) { - Function functionCall = expression.getFunctionCall(); - if (functionCall != null) { - String operator = functionCall.getOperator(); - try { - AggregationFunctionType.getAggregationFunctionType(operator); - return true; - } catch (IllegalArgumentException e) { - } - if (functionCall.getOperandsSize() > 0) { - for (Expression operand : functionCall.getOperands()) { - if (isAggregateExpression(operand)) { - return true; - } - } - } - } - return false; - } - - public static boolean isAsFunction(Expression expression) { - return expression.getFunctionCall() != null && expression.getFunctionCall().getOperator().equalsIgnoreCase("AS"); - } - - /** - * Extract all the identifiers from given expressions. - * - * @param expressions - * @param excludeAs if true, ignores the right side identifier for AS function. - * @return all the identifier names. - */ - public static Set<String> extractIdentifiers(List<Expression> expressions, boolean excludeAs) { - Set<String> identifiers = new HashSet<>(); - for (Expression expression : expressions) { - if (expression.getIdentifier() != null) { - identifiers.add(expression.getIdentifier().getName()); - } else if (expression.getFunctionCall() != null) { - if (excludeAs && expression.getFunctionCall().getOperator().equalsIgnoreCase("AS")) { - identifiers.addAll( - extractIdentifiers(Arrays.asList(expression.getFunctionCall().getOperands().get(0)), true)); - continue; - } else { - identifiers.addAll(extractIdentifiers(expression.getFunctionCall().getOperands(), excludeAs)); - } - } - } - return identifiers; - } - - /** - * Compiles a String expression into {@link Expression}. - * - * @param expression String expression. - * @return {@link Expression} equivalent of the string. - * - * @throws SqlCompilationException if String is not a valid expression. - */ - public static Expression compileToExpression(String expression) { - SqlParser sqlParser = SqlParser.create(expression, ParserUtils.PARSER_CONFIG); - SqlNode sqlNode; - try { - sqlNode = sqlParser.parseExpression(); - } catch (SqlParseException e) { - throw new SqlCompilationException("Caught exception while parsing expression: " + expression, e); - } - return toExpression(sqlNode); - } - - private static List<Expression> convertDistinctSelectList(SqlNodeList selectList) { - List<Expression> selectExpr = new ArrayList<>(); - selectExpr.add(convertDistinctAndSelectListToFunctionExpression(selectList)); - return selectExpr; - } - - private static List<Expression> convertSelectList(SqlNodeList selectList) { - List<Expression> selectExpr = new ArrayList<>(); - - final Iterator<SqlNode> iterator = selectList.iterator(); - while (iterator.hasNext()) { - final SqlNode next = iterator.next(); - selectExpr.add(toExpression(next)); - } - - return selectExpr; - } - - private static List<Expression> convertOrderByList(SqlNodeList orderList) { - List<Expression> orderByExpr = new ArrayList<>(); - final Iterator<SqlNode> iterator = orderList.iterator(); - while (iterator.hasNext()) { - final SqlNode next = iterator.next(); - orderByExpr.add(convertOrderBy(next)); - } - return orderByExpr; - } - - private static Expression convertOrderBy(SqlNode node) { - final SqlKind kind = node.getKind(); - Expression expression; - switch (kind) { - case DESCENDING: - SqlBasicCall basicCall = (SqlBasicCall) node; - expression = RequestUtils.getFunctionExpression("DESC"); - expression.getFunctionCall().addToOperands(toExpression(basicCall.getOperandList().get(0))); - break; - case IDENTIFIER: - default: - expression = RequestUtils.getFunctionExpression("ASC"); - expression.getFunctionCall().addToOperands(toExpression(node)); - break; - } - return expression; - } - - /** - * DISTINCT is implemented as an aggregation function so need to take the select list items - * and convert them into a single function expression for handing over to execution engine - * either as a PinotQuery or BrokerRequest via conversion - * @param selectList select list items - * @return DISTINCT function expression - */ - private static Expression convertDistinctAndSelectListToFunctionExpression(SqlNodeList selectList) { - String functionName = AggregationFunctionType.DISTINCT.getName(); - Expression functionExpression = RequestUtils.getFunctionExpression(functionName); - for (SqlNode node : selectList) { - Expression columnExpression = toExpression(node); - if (columnExpression.getType() == ExpressionType.IDENTIFIER && columnExpression.getIdentifier().getName() - .equals("*")) { - throw new SqlCompilationException( - "Syntax error: Pinot currently does not support DISTINCT with *. Please specify each column name after " - + "DISTINCT keyword"); - } else if (columnExpression.getType() == ExpressionType.FUNCTION) { - Function functionCall = columnExpression.getFunctionCall(); - String function = functionCall.getOperator(); - if (AggregationFunctionType.isAggregationFunction(function)) { - throw new SqlCompilationException( - "Syntax error: Use of DISTINCT with aggregation functions is not supported"); - } - } - functionExpression.getFunctionCall().addToOperands(columnExpression); - } - return functionExpression; - } - - private static Expression toExpression(SqlNode node) { - LOGGER.debug("Current processing SqlNode: {}, node.getKind(): {}", node, node.getKind()); - switch (node.getKind()) { - case IDENTIFIER: - if (((SqlIdentifier) node).isStar()) { - return RequestUtils.getIdentifierExpression("*"); - } - if (((SqlIdentifier) node).isSimple()) { - return RequestUtils.getIdentifierExpression(((SqlIdentifier) node).getSimple()); - } - return RequestUtils.getIdentifierExpression(node.toString()); - case LITERAL: - return RequestUtils.getLiteralExpression((SqlLiteral) node); - case AS: - SqlBasicCall asFuncSqlNode = (SqlBasicCall) node; - List<SqlNode> operands = asFuncSqlNode.getOperandList(); - Expression leftExpr = toExpression(operands.get(0)); - SqlNode aliasSqlNode = operands.get(1); - String aliasName; - switch (aliasSqlNode.getKind()) { - case IDENTIFIER: - aliasName = ((SqlIdentifier) aliasSqlNode).getSimple(); - break; - case LITERAL: - aliasName = ((SqlLiteral) aliasSqlNode).toValue(); - break; - default: - throw new SqlCompilationException("Unsupported Alias sql node - " + aliasSqlNode); - } - Expression rightExpr = RequestUtils.getIdentifierExpression(aliasName); - // Just return left identifier if both sides are the same identifier. - if (leftExpr.isSetIdentifier() && rightExpr.isSetIdentifier()) { - if (leftExpr.getIdentifier().getName().equals(rightExpr.getIdentifier().getName())) { - return leftExpr; - } - } - final Expression asFuncExpr = RequestUtils.getFunctionExpression(SqlKind.AS.toString()); - asFuncExpr.getFunctionCall().addToOperands(leftExpr); - asFuncExpr.getFunctionCall().addToOperands(rightExpr); - return asFuncExpr; - case CASE: - // CASE WHEN Statement is model as a function with variable length parameters. - // Assume N is number of WHEN Statements, total number of parameters is (2 * N + 1). - // - N: Convert each WHEN Statement into a function Expression; - // - N: Convert each THEN Statement into an Expression; - // - 1: Convert ELSE Statement into an Expression. - SqlCase caseSqlNode = (SqlCase) node; - SqlNodeList whenOperands = caseSqlNode.getWhenOperands(); - SqlNodeList thenOperands = caseSqlNode.getThenOperands(); - SqlNode elseOperand = caseSqlNode.getElseOperand(); - Expression caseFuncExpr = RequestUtils.getFunctionExpression(SqlKind.CASE.name()); - for (SqlNode whenSqlNode : whenOperands.getList()) { - Expression whenExpression = toExpression(whenSqlNode); - if (isAggregateExpression(whenExpression)) { - throw new SqlCompilationException( - "Aggregation functions inside WHEN Clause is not supported - " + whenSqlNode); - } - caseFuncExpr.getFunctionCall().addToOperands(whenExpression); - } - for (SqlNode thenSqlNode : thenOperands.getList()) { - Expression thenExpression = toExpression(thenSqlNode); - if (isAggregateExpression(thenExpression)) { - throw new SqlCompilationException( - "Aggregation functions inside THEN Clause is not supported - " + thenSqlNode); - } - caseFuncExpr.getFunctionCall().addToOperands(thenExpression); - } - Expression elseExpression = toExpression(elseOperand); - if (isAggregateExpression(elseExpression)) { - throw new SqlCompilationException( - "Aggregation functions inside ELSE Clause is not supported - " + elseExpression); - } - caseFuncExpr.getFunctionCall().addToOperands(elseExpression); - return caseFuncExpr; - default: - if (node instanceof SqlDataTypeSpec) { - // This is to handle expression like: CAST(col AS INT) - return RequestUtils.getLiteralExpression(((SqlDataTypeSpec) node).getTypeName().getSimple()); - } else { - return compileFunctionExpression((SqlBasicCall) node); - } - } - } - - private static Expression compileFunctionExpression(SqlBasicCall functionNode) { - SqlKind functionKind = functionNode.getKind(); - String functionName; - switch (functionKind) { - case AND: - return compileAndExpression(functionNode); - case OR: - return compileOrExpression(functionNode); - case COUNT: - SqlLiteral functionQuantifier = functionNode.getFunctionQuantifier(); - if (functionQuantifier != null && functionQuantifier.toValue().equalsIgnoreCase("DISTINCT")) { - functionName = AggregationFunctionType.DISTINCTCOUNT.name(); - } else { - functionName = AggregationFunctionType.COUNT.name(); - } - break; - case OTHER: - case OTHER_FUNCTION: - case DOT: - functionName = functionNode.getOperator().getName().toUpperCase(); - if (functionName.equals("ITEM") || functionName.equals("DOT")) { - // Calcite parses path expression such as "data[0][1].a.b[0]" into a chain of ITEM and/or DOT - // functions. Collapse this chain into an identifier. - StringBuffer path = new StringBuffer(); - compilePathExpression(functionName, functionNode, path); - return RequestUtils.getIdentifierExpression(path.toString()); - } - break; - default: - functionName = functionKind.name(); - break; - } - // When there is no argument, set an empty list as the operands - List<SqlNode> childNodes = functionNode.getOperandList(); - List<Expression> operands = new ArrayList<>(childNodes.size()); - for (SqlNode childNode : childNodes) { - if (childNode instanceof SqlNodeList) { - for (SqlNode node : (SqlNodeList) childNode) { - operands.add(toExpression(node)); - } - } else { - operands.add(toExpression(childNode)); - } - } - validateFunction(functionName, operands); - Expression functionExpression = RequestUtils.getFunctionExpression(functionName); - functionExpression.getFunctionCall().setOperands(operands); - return functionExpression; - } - - /** - * Convert Calcite operator tree made up of ITEM and DOT functions to an identifier. For example, the operator tree - * shown below will be converted to IDENTIFIER "jsoncolumn.data[0][1].a.b[0]". - * - * ├── ITEM(jsoncolumn.data[0][1].a.b[0]) - * ├── LITERAL (0) - * └── DOT (jsoncolumn.daa[0][1].a.b) - * ├── IDENTIFIER (b) - * └── DOT (jsoncolumn.data[0][1].a) - * ├── IDENTIFIER (a) - * └── ITEM (jsoncolumn.data[0][1]) - * ├── LITERAL (1) - * └── ITEM (jsoncolumn.data[0]) - * ├── LITERAL (1) - * └── IDENTIFIER (jsoncolumn.data) - * - * @param functionName Name of the function ("DOT" or "ITEM") - * @param functionNode Root node of the DOT and/or ITEM operator function chain. - * @param path String representation of path represented by DOT and/or ITEM function chain. - */ - private static void compilePathExpression(String functionName, SqlBasicCall functionNode, StringBuffer path) { - List<SqlNode> operands = functionNode.getOperandList(); - - // Compile first operand of the function (either an identifier or another DOT and/or ITEM function). - SqlKind kind0 = operands.get(0).getKind(); - if (kind0 == SqlKind.IDENTIFIER) { - path.append(operands.get(0).toString()); - } else if (kind0 == SqlKind.DOT || kind0 == SqlKind.OTHER_FUNCTION) { - SqlBasicCall function0 = (SqlBasicCall) operands.get(0); - String name0 = function0.getOperator().getName(); - if (name0.equals("ITEM") || name0.equals("DOT")) { - compilePathExpression(name0, function0, path); - } else { - throw new SqlCompilationException("SELECT list item has bad path expression."); - } - } else { - throw new SqlCompilationException("SELECT list item has bad path expression."); - } - - // Compile second operand of the function (either an identifier or literal). - SqlKind kind1 = operands.get(1).getKind(); - if (kind1 == SqlKind.IDENTIFIER) { - path.append(".").append(((SqlIdentifier) operands.get(1)).getSimple()); - } else if (kind1 == SqlKind.LITERAL) { - path.append("[").append(((SqlLiteral) operands.get(1)).toValue()).append("]"); - } else { - throw new SqlCompilationException("SELECT list item has bad path expression."); - } - } - - public static String canonicalize(String functionName) { - return StringUtils.remove(functionName, '_').toLowerCase(); - } - - public static boolean isSameFunction(String function1, String function2) { - return canonicalize(function1).equals(canonicalize(function2)); - } - - private static void validateFunction(String functionName, List<Expression> operands) { - switch (canonicalize(functionName)) { - case "jsonextractscalar": - validateJsonExtractScalarFunction(operands); - break; - case "jsonextractkey": - validateJsonExtractKeyFunction(operands); - break; - default: - break; - } - } - - private static void validateJsonExtractScalarFunction(List<Expression> operands) { - int numOperands = operands.size(); - - // Check that there are exactly 3 or 4 arguments - if (numOperands != 3 && numOperands != 4) { - throw new SqlCompilationException( - "Expect 3 or 4 arguments for transform function: jsonExtractScalar(jsonFieldName, 'jsonPath', " - + "'resultsType', ['defaultValue'])"); - } - if (!operands.get(1).isSetLiteral() || !operands.get(2).isSetLiteral() || (numOperands == 4 && !operands.get(3) - .isSetLiteral())) { - throw new SqlCompilationException( - "Expect the 2nd/3rd/4th argument of transform function: jsonExtractScalar(jsonFieldName, 'jsonPath'," - + " 'resultsType', ['defaultValue']) to be a single-quoted literal value."); - } - } - - private static void validateJsonExtractKeyFunction(List<Expression> operands) { - // Check that there are exactly 2 arguments - if (operands.size() != 2) { - throw new SqlCompilationException( - "Expect 2 arguments are required for transform function: jsonExtractKey(jsonFieldName, 'jsonPath')"); - } - if (!operands.get(1).isSetLiteral()) { - throw new SqlCompilationException( - "Expect the 2nd argument for transform function: jsonExtractKey(jsonFieldName, 'jsonPath') to be a " - + "single-quoted literal value."); - } - } - - /** - * Helper method to flatten the operands for the AND expression. - */ - private static Expression compileAndExpression(SqlBasicCall andNode) { - List<Expression> operands = new ArrayList<>(); - for (SqlNode childNode : andNode.getOperandList()) { - if (childNode.getKind() == SqlKind.AND) { - Expression childAndExpression = compileAndExpression((SqlBasicCall) childNode); - operands.addAll(childAndExpression.getFunctionCall().getOperands()); - } else { - operands.add(toExpression(childNode)); - } - } - Expression andExpression = RequestUtils.getFunctionExpression(SqlKind.AND.name()); - andExpression.getFunctionCall().setOperands(operands); - return andExpression; - } - - /** - * Helper method to flatten the operands for the OR expression. - */ - private static Expression compileOrExpression(SqlBasicCall orNode) { - List<Expression> operands = new ArrayList<>(); - for (SqlNode childNode : orNode.getOperandList()) { - if (childNode.getKind() == SqlKind.OR) { - Expression childAndExpression = compileOrExpression((SqlBasicCall) childNode); - operands.addAll(childAndExpression.getFunctionCall().getOperands()); - } else { - operands.add(toExpression(childNode)); - } - } - Expression andExpression = RequestUtils.getFunctionExpression(SqlKind.OR.name()); - andExpression.getFunctionCall().setOperands(operands); - return andExpression; - } - - public static boolean isLiteralOnlyExpression(Expression e) { - if (e.getType() == ExpressionType.LITERAL) { - return true; - } - if (e.getType() == ExpressionType.FUNCTION) { - Function functionCall = e.getFunctionCall(); - if (functionCall.getOperator().equalsIgnoreCase(SqlKind.AS.toString())) { - return isLiteralOnlyExpression(functionCall.getOperands().get(0)); - } - return false; - } - return false; - } -} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java new file mode 100644 index 0000000000..91d573c89a --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.parser; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlKind; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.ExpressionType; +import org.apache.pinot.common.request.Function; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.sql.parsers.SqlCompilationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Calcite parser to convert SQL expressions into {@link Expression}. + * + * <p>This class is extracted from {@link org.apache.pinot.sql.parsers.CalciteSqlParser}. It contains the logic + * to parsed {@link org.apache.calcite.rex.RexNode}, in the format of {@link RexExpression} and convert them into + * Thrift {@link Expression} format. + */ +public class CalciteRexExpressionParser { + private static final Logger LOGGER = LoggerFactory.getLogger(CalciteRexExpressionParser.class); + + private CalciteRexExpressionParser() { + // do not instantiate. + } + + // -------------------------------------------------------------------------- + // Relational conversion Utils + // -------------------------------------------------------------------------- + + public static List<Expression> convertSelectList(List<RexExpression> rexNodeList, PinotQuery pinotQuery) { + List<Expression> selectExpr = new ArrayList<>(); + + final Iterator<RexExpression> iterator = rexNodeList.iterator(); + while (iterator.hasNext()) { + final RexExpression next = iterator.next(); + selectExpr.add(toExpression(next, pinotQuery)); + } + + return selectExpr; + } + + private static List<Expression> convertDistinctSelectList(RexExpression.FunctionCall rexCall, PinotQuery pinotQuery) { + List<Expression> selectExpr = new ArrayList<>(); + selectExpr.add(convertDistinctAndSelectListToFunctionExpression(rexCall, pinotQuery)); + return selectExpr; + } + + private static List<Expression> convertOrderByList(RexExpression.FunctionCall rexCall, PinotQuery pinotQuery) { + Preconditions.checkState(rexCall.getKind() == SqlKind.ORDER_BY); + List<Expression> orderByExpr = new ArrayList<>(); + + final Iterator<RexExpression> iterator = rexCall.getFunctionOperands().iterator(); + while (iterator.hasNext()) { + final RexExpression next = iterator.next(); + orderByExpr.add(convertOrderBy(next, pinotQuery)); + } + return orderByExpr; + } + + private static Expression convertOrderBy(RexExpression rexNode, PinotQuery pinotQuery) { + final SqlKind kind = rexNode.getKind(); + Expression expression; + switch (kind) { + case DESCENDING: + RexExpression.FunctionCall rexCall = (RexExpression.FunctionCall) rexNode; + expression = RequestUtils.getFunctionExpression("DESC"); + expression.getFunctionCall().addToOperands(toExpression(rexCall.getFunctionOperands().get(0), pinotQuery)); + break; + case IDENTIFIER: + default: + expression = RequestUtils.getFunctionExpression("ASC"); + expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery)); + break; + } + return expression; + } + + private static Expression convertDistinctAndSelectListToFunctionExpression(RexExpression.FunctionCall rexCall, + PinotQuery pinotQuery) { + String functionName = AggregationFunctionType.DISTINCT.getName(); + Expression functionExpression = RequestUtils.getFunctionExpression(functionName); + for (RexExpression node : rexCall.getFunctionOperands()) { + Expression columnExpression = toExpression(node, pinotQuery); + if (columnExpression.getType() == ExpressionType.IDENTIFIER && columnExpression.getIdentifier().getName() + .equals("*")) { + throw new SqlCompilationException( + "Syntax error: Pinot currently does not support DISTINCT with *. Please specify each column name after " + + "DISTINCT keyword"); + } else if (columnExpression.getType() == ExpressionType.FUNCTION) { + Function functionCall = columnExpression.getFunctionCall(); + String function = functionCall.getOperator(); + if (AggregationFunctionType.isAggregationFunction(function)) { + throw new SqlCompilationException( + "Syntax error: Use of DISTINCT with aggregation functions is not supported"); + } + } + functionExpression.getFunctionCall().addToOperands(columnExpression); + } + return functionExpression; + } + + public static Expression toExpression(RexExpression rexNode, PinotQuery pinotQuery) { + LOGGER.debug("Current processing RexNode: {}, node.getKind(): {}", rexNode, rexNode.getKind()); + switch (rexNode.getKind()) { + case INPUT_REF: + return inputRefToIdentifier((RexExpression.InputRef) rexNode, pinotQuery); + case LITERAL: + return rexLiteralToExpression((RexExpression.Literal) rexNode); + default: + return compileFunctionExpression((RexExpression.FunctionCall) rexNode, pinotQuery); + } + } + + private static Expression rexLiteralToExpression(RexExpression.Literal rexLiteral) { + RelDataType type = rexLiteral.getDataType(); + switch (type.getSqlTypeName()) { + default: + return RequestUtils.getLiteralExpression(rexLiteral.getValue()); + } + } + + private static Expression inputRefToIdentifier(RexExpression.InputRef inputRef, PinotQuery pinotQuery) { + List<Expression> selectList = pinotQuery.getSelectList(); + return selectList.get(inputRef.getIndex()); + } + + private static Expression compileFunctionExpression(RexExpression.FunctionCall rexCall, PinotQuery pinotQuery) { + SqlKind functionKind = rexCall.getKind(); + String functionName; + switch (functionKind) { + case AND: + return compileAndExpression(rexCall, pinotQuery); + case OR: + return compileOrExpression(rexCall, pinotQuery); + case COUNT: + case OTHER: + case OTHER_FUNCTION: + case DOT: + default: + functionName = functionKind.name(); + break; + } + // When there is no argument, set an empty list as the operands + List<RexExpression> childNodes = rexCall.getFunctionOperands(); + List<Expression> operands = new ArrayList<>(childNodes.size()); + for (RexExpression childNode : childNodes) { + operands.add(toExpression(childNode, pinotQuery)); + } + ParserUtils.validateFunction(functionName, operands); + Expression functionExpression = RequestUtils.getFunctionExpression(functionName); + functionExpression.getFunctionCall().setOperands(operands); + return functionExpression; + } + + /** + * Helper method to flatten the operands for the AND expression. + */ + private static Expression compileAndExpression(RexExpression.FunctionCall andNode, PinotQuery pinotQuery) { + List<Expression> operands = new ArrayList<>(); + for (RexExpression childNode : andNode.getFunctionOperands()) { + if (childNode.getKind() == SqlKind.AND) { + Expression childAndExpression = compileAndExpression((RexExpression.FunctionCall) childNode, pinotQuery); + operands.addAll(childAndExpression.getFunctionCall().getOperands()); + } else { + operands.add(toExpression(childNode, pinotQuery)); + } + } + Expression andExpression = RequestUtils.getFunctionExpression(SqlKind.AND.name()); + andExpression.getFunctionCall().setOperands(operands); + return andExpression; + } + + /** + * Helper method to flatten the operands for the OR expression. + */ + private static Expression compileOrExpression(RexExpression.FunctionCall orNode, PinotQuery pinotQuery) { + List<Expression> operands = new ArrayList<>(); + for (RexExpression childNode : orNode.getFunctionOperands()) { + if (childNode.getKind() == SqlKind.OR) { + Expression childAndExpression = compileOrExpression((RexExpression.FunctionCall) childNode, pinotQuery); + operands.addAll(childAndExpression.getFunctionCall().getOperands()); + } else { + operands.add(toExpression(childNode, pinotQuery)); + } + } + Expression andExpression = RequestUtils.getFunctionExpression(SqlKind.OR.name()); + andExpression.getFunctionCall().setOperands(operands); + return andExpression; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java index d67896f9b9..d2fd054833 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; * This class provide API to parse a SQL string into Pinot query {@link SqlNode}. * * <p>This class is extracted from {@link org.apache.pinot.sql.parsers.CalciteSqlParser}. It contains the logic - * to parsed SQL into {@link SqlNode} and use {@link QueryRewriter} to rewrite the query with Pinot specific + * to parsed SQL string into {@link SqlNode} and use {@link QueryRewriter} to rewrite the query with Pinot specific * contextual info. */ public class CalciteSqlParser { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java index 5422382509..ec6494fea2 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java @@ -18,11 +18,15 @@ */ package org.apache.pinot.query.parser; +import java.util.List; import java.util.regex.Pattern; import org.apache.calcite.config.Lex; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.babel.SqlBabelParserImpl; import org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.sql.parsers.SqlCompilationException; /** @@ -60,4 +64,55 @@ final class ParserUtils { private ParserUtils() { // do not instantiate. } + + public static String canonicalize(String functionName) { + return StringUtils.remove(functionName, '_').toLowerCase(); + } + + public static boolean isSameFunction(String function1, String function2) { + return canonicalize(function1).equals(canonicalize(function2)); + } + + public static void validateFunction(String functionName, List<Expression> operands) { + switch (canonicalize(functionName)) { + case "jsonextractscalar": + validateJsonExtractScalarFunction(operands); + break; + case "jsonextractkey": + validateJsonExtractKeyFunction(operands); + break; + default: + break; + } + } + + private static void validateJsonExtractScalarFunction(List<Expression> operands) { + int numOperands = operands.size(); + + // Check that there are exactly 3 or 4 arguments + if (numOperands != 3 && numOperands != 4) { + throw new SqlCompilationException( + "Expect 3 or 4 arguments for transform function: jsonExtractScalar(jsonFieldName, 'jsonPath', " + + "'resultsType', ['defaultValue'])"); + } + if (!operands.get(1).isSetLiteral() || !operands.get(2).isSetLiteral() || (numOperands == 4 && !operands.get(3) + .isSetLiteral())) { + throw new SqlCompilationException( + "Expect the 2nd/3rd/4th argument of transform function: jsonExtractScalar(jsonFieldName, 'jsonPath'," + + " 'resultsType', ['defaultValue']) to be a single-quoted literal value."); + } + } + + private static void validateJsonExtractKeyFunction(List<Expression> operands) { + // Check that there are exactly 2 arguments + if (operands.size() != 2) { + throw new SqlCompilationException( + "Expect 2 arguments are required for transform function: jsonExtractKey(jsonFieldName, 'jsonPath')"); + } + if (!operands.get(1).isSetLiteral()) { + throw new SqlCompilationException( + "Expect the 2nd argument for transform function: jsonExtractKey(jsonFieldName, 'jsonPath') to be a " + + "single-quoted literal value."); + } + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java index cb075bb9f3..a93bddc4f7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java @@ -19,7 +19,8 @@ package org.apache.pinot.query.planner; import java.util.Map; -import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.planner.logical.LogicalPlanner; +import org.apache.pinot.query.planner.stage.StageNode; /** diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java index 90dd22a8aa..8e691003c9 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java @@ -24,8 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.core.transport.ServerInstance; -import org.apache.pinot.query.planner.nodes.StageNode; -import org.apache.pinot.query.planner.nodes.TableScanNode; +import org.apache.pinot.query.planner.stage.StageNode; +import org.apache.pinot.query.planner.stage.TableScanNode; /** diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java similarity index 62% copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java copy to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java index 0aa8c94ec8..19a9daa54f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java @@ -16,23 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.hints; +import org.apache.calcite.rel.hint.RelHint; -public class CalcNode extends AbstractStageNode { - private String _expression; - - public CalcNode(int stageId) { - super(stageId); - } - - public CalcNode(int stageId, String expression) { - super(stageId); - _expression = expression; - } +/** + * Provide certain relational hint to query planner for better optimization. + */ +public class PinotRelationalHints { + public static final RelHint USE_HASH_DISTRIBUTE = RelHint.builder("USE_HASH_DISTRIBUTE").build(); + public static final RelHint USE_BROADCAST_DISTRIBUTE = RelHint.builder("USE_BROADCAST_DISTRIBUTE").build(); - public String getExpression() { - return _expression; + private PinotRelationalHints() { + // do not instantiate. } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java similarity index 97% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java index 9844916490..0e317560e9 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner; +package org.apache.pinot.query.planner.logical; import java.util.ArrayList; import java.util.List; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java similarity index 75% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java index 572302ef92..3750437f7f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner; +package org.apache.pinot.query.planner.logical; import com.google.common.base.Preconditions; import java.util.Collections; @@ -24,19 +24,21 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.logical.LogicalCalc; +import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.sql.SqlKind; -import org.apache.pinot.query.planner.nodes.CalcNode; -import org.apache.pinot.query.planner.nodes.JoinNode; -import org.apache.pinot.query.planner.nodes.StageNode; -import org.apache.pinot.query.planner.nodes.TableScanNode; import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; +import org.apache.pinot.query.planner.stage.FilterNode; +import org.apache.pinot.query.planner.stage.JoinNode; +import org.apache.pinot.query.planner.stage.ProjectNode; +import org.apache.pinot.query.planner.stage.StageNode; +import org.apache.pinot.query.planner.stage.TableScanNode; /** @@ -57,27 +59,32 @@ public final class RelToStageConverter { * @return stage node. */ public static StageNode toStageNode(RelNode node, int currentStageId) { - if (node instanceof LogicalCalc) { - return convertLogicalCal((LogicalCalc) node, currentStageId); - } else if (node instanceof LogicalTableScan) { + if (node instanceof LogicalTableScan) { return convertLogicalTableScan((LogicalTableScan) node, currentStageId); } else if (node instanceof LogicalJoin) { return convertLogicalJoin((LogicalJoin) node, currentStageId); + } else if (node instanceof LogicalProject) { + return convertLogicalProject((LogicalProject) node, currentStageId); + } else if (node instanceof LogicalFilter) { + return convertLogicalFilter((LogicalFilter) node, currentStageId); } else { throw new UnsupportedOperationException("Unsupported logical plan node: " + node); } } + private static StageNode convertLogicalProject(LogicalProject node, int currentStageId) { + return new ProjectNode(currentStageId, node.getRowType(), node.getProjects()); + } + + private static StageNode convertLogicalFilter(LogicalFilter node, int currentStageId) { + return new FilterNode(currentStageId, node.getRowType(), node.getCondition()); + } + private static StageNode convertLogicalTableScan(LogicalTableScan node, int currentStageId) { String tableName = node.getTable().getQualifiedName().get(0); List<String> columnNames = node.getRowType().getFieldList().stream() .map(RelDataTypeField::getName).collect(Collectors.toList()); - return new TableScanNode(currentStageId, tableName, columnNames); - } - - private static StageNode convertLogicalCal(LogicalCalc node, int currentStageId) { - // TODO: support actual calcNode - return new CalcNode(currentStageId, node.getDigest()); + return new TableScanNode(currentStageId, node.getRowType(), tableName, columnNames); } private static StageNode convertLogicalJoin(LogicalJoin node, int currentStageId) { @@ -95,7 +102,7 @@ public final class RelToStageConverter { FieldSelectionKeySelector leftFieldSelectionKeySelector = new FieldSelectionKeySelector(leftOperandIndex); FieldSelectionKeySelector rightFieldSelectionKeySelector = new FieldSelectionKeySelector(rightOperandIndex - leftRowType.getFieldNames().size()); - return new JoinNode(currentStageId, joinType, Collections.singletonList(new JoinNode.JoinClause( + return new JoinNode(currentStageId, node.getRowType(), joinType, Collections.singletonList(new JoinNode.JoinClause( leftFieldSelectionKeySelector, rightFieldSelectionKeySelector))); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java new file mode 100644 index 0000000000..899636ab4d --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import java.math.BigDecimal; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; +import org.apache.pinot.query.planner.serde.ProtoProperties; +import org.checkerframework.checker.nullness.qual.Nullable; + + +/** + * {@code RexExpression} is the serializable format of the {@link RexNode}. + */ +public abstract class RexExpression { + @ProtoProperties + protected SqlKind _sqlKind; + @ProtoProperties + protected RelDataType _dataType; + + public SqlKind getKind() { + return _sqlKind; + } + + public RelDataType getDataType() { + return _dataType; + } + + public static RexExpression toRexExpression(RexNode rexNode) { + if (rexNode instanceof RexInputRef) { + return new RexExpression.InputRef(((RexInputRef) rexNode).getIndex()); + } else if (rexNode instanceof RexLiteral) { + RexLiteral rexLiteral = ((RexLiteral) rexNode); + return new RexExpression.Literal(rexLiteral.getType(), rexLiteral.getTypeName(), rexLiteral.getValue()); + } else if (rexNode instanceof RexCall) { + RexCall rexCall = (RexCall) rexNode; + List<RexExpression> operands = rexCall.getOperands().stream().map(RexExpression::toRexExpression) + .collect(Collectors.toList()); + return new RexExpression.FunctionCall(rexCall.getKind(), rexCall.getType(), rexCall.getOperator().getName(), + operands); + } else { + throw new IllegalArgumentException("Unsupported RexNode type with SqlKind: " + rexNode.getKind()); + } + } + + private static Comparable convertLiteral(Comparable value, SqlTypeName sqlTypeName, RelDataType dataType) { + switch (sqlTypeName) { + case BOOLEAN: + return (boolean) value; + case DECIMAL: + switch (dataType.getSqlTypeName()) { + case INTEGER: + return ((BigDecimal) value).intValue(); + case BIGINT: + return ((BigDecimal) value).longValue(); + case FLOAT: + return ((BigDecimal) value).floatValue(); + case DOUBLE: + default: + return ((BigDecimal) value).doubleValue(); + } + case CHAR: + switch (dataType.getSqlTypeName()) { + case VARCHAR: + return ((NlsString) value).getValue(); + default: + return value; + } + default: + return value; + } + } + + public static class InputRef extends RexExpression { + @ProtoProperties + private int _index; + + public InputRef() { + } + + public InputRef(int index) { + _sqlKind = SqlKind.INPUT_REF; + _index = index; + } + + public int getIndex() { + return _index; + } + } + + public static class Literal extends RexExpression { + @ProtoProperties + private Object _value; + + public Literal() { + } + + public Literal(RelDataType dataType, SqlTypeName sqlTypeName, @Nullable Comparable value) { + _sqlKind = SqlKind.LITERAL; + _dataType = dataType; + _value = convertLiteral(value, sqlTypeName, dataType); + } + + public Object getValue() { + return _value; + } + } + + public static class FunctionCall extends RexExpression { + @ProtoProperties + private String _functionName; + @ProtoProperties + private List<RexExpression> _functionOperands; + + public FunctionCall() { + } + + public FunctionCall(SqlKind sqlKind, RelDataType type, String functionName, List<RexExpression> functionOperands) { + _sqlKind = sqlKind; + _dataType = type; + _functionName = functionName; + _functionOperands = functionOperands; + } + + public String getFunctionName() { + return _functionName; + } + + public List<RexExpression> getFunctionOperands() { + return _functionOperands; + } + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java similarity index 79% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java index f6ec38738c..b5dfab8689 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner; +package org.apache.pinot.query.planner.logical; import java.util.HashMap; import java.util.List; @@ -26,9 +26,12 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.logical.LogicalExchange; import org.apache.pinot.query.context.PlannerContext; -import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; -import org.apache.pinot.query.planner.nodes.MailboxSendNode; -import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; +import org.apache.pinot.query.planner.stage.MailboxReceiveNode; +import org.apache.pinot.query.planner.stage.MailboxSendNode; +import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.routing.WorkerManager; @@ -66,12 +69,13 @@ public class StagePlanner { // walk the plan and create stages. StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId()); - // global root needs to send results back to the ROOT, a.k.a. the client response node. - // the last stage is always a broadcast-gather. + // global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one + // receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default. StageNode globalReceiverNode = - new MailboxReceiveNode(0, globalStageRoot.getStageId(), RelDistribution.Type.BROADCAST_DISTRIBUTED); - StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalReceiverNode.getStageId(), - RelDistribution.Type.BROADCAST_DISTRIBUTED); + new MailboxReceiveNode(0, relRoot.getRowType(), globalStageRoot.getStageId(), + RelDistribution.Type.SINGLETON); + StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), relRoot.getRowType(), + globalReceiverNode.getStageId(), RelDistribution.Type.SINGLETON); globalSenderNode.addInput(globalStageRoot); _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode); StageMetadata stageMetadata = _stageMetadataMap.get(globalSenderNode.getStageId()); @@ -95,12 +99,15 @@ public class StagePlanner { if (isExchangeNode(node)) { // 1. exchangeNode always have only one input, get its input converted as a new stage root. StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId()); - RelDistribution.Type exchangeType = ((LogicalExchange) node).distribution.getType(); + RelDistribution distribution = ((LogicalExchange) node).getDistribution(); + RelDistribution.Type exchangeType = distribution.getType(); // 2. make an exchange sender and receiver node pair - StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType); - StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), mailboxReceiver.getStageId(), + StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, node.getRowType(), nextStageRoot.getStageId(), exchangeType); + StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), node.getRowType(), + mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED + ? new FieldSelectionKeySelector(distribution.getKeys().get(0)) : null); mailboxSender.addInput(nextStageRoot); // 3. put the sender side as a completed stage. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java index 95991d558b..14f263c44f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java @@ -18,12 +18,15 @@ */ package org.apache.pinot.query.planner.partitioning; +import org.apache.pinot.query.planner.serde.ProtoProperties; + /** * The {@code FieldSelectionKeySelector} simply extract a column value out from a row array {@link Object[]}. */ public class FieldSelectionKeySelector implements KeySelector<Object[], Object> { + @ProtoProperties private int _columnIndex; public FieldSelectionKeySelector() { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java index eaefb77604..e6b6e598a2 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java @@ -18,8 +18,6 @@ */ package org.apache.pinot.query.planner.partitioning; -import java.io.Serializable; - /** * The {@code KeySelector} provides a partitioning function to encode a specific input data type into a key. @@ -28,7 +26,7 @@ import java.io.Serializable; * * <p>Key selector should always produce the same selection hash key when the same input is provided. */ -public interface KeySelector<IN, OUT> extends Serializable { +public interface KeySelector<IN, OUT> { /** * Extract the key out of an input data construct. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java similarity index 63% copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java copy to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java index 2b99003e87..5a10b91941 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java @@ -22,14 +22,19 @@ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated */ -package org.apache.pinot.query.planner.nodes.serde; +package org.apache.pinot.query.planner.serde; -import org.apache.pinot.common.proto.Plan; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; -public interface ProtoSerializable { - - void setObjectField(Plan.ObjectField objFields); - - Plan.ObjectField getObjectField(); +/** + * Annotation {@code ProtoProperties} indicates whether a field defined in a + * {@link org.apache.pinot.query.planner.stage.StageNode} should be serialized. + */ +@Target({ElementType.ANNOTATION_TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER}) +@Retention(RetentionPolicy.RUNTIME) +public @interface ProtoProperties { } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoSerializable.java similarity index 62% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoSerializable.java index 2b99003e87..f10cb9dd35 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoSerializable.java @@ -22,14 +22,28 @@ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated */ -package org.apache.pinot.query.planner.nodes.serde; +package org.apache.pinot.query.planner.serde; import org.apache.pinot.common.proto.Plan; +/** + * Interface to convert between proto serialized payload and object. + * + * <p>Classes that implement {@code ProtoSerializable} should provide methods to convert to and from + * {@link Plan.ObjectField}. + */ public interface ProtoSerializable { - void setObjectField(Plan.ObjectField objFields); + /** + * Setting object's own member variable from a serialized {@link Plan.ObjectField}. + * @param objFields the serialized ObjectField. + */ + void fromObjectField(Plan.ObjectField objFields); - Plan.ObjectField getObjectField(); + /** + * convert the object to a serialized {@link Plan.ObjectField}. + * @return the serialized ObjectField. + */ + Plan.ObjectField toObjectField(); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoSerializationUtils.java similarity index 61% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoSerializationUtils.java index c30295a101..d0d3b1dc3e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoSerializationUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes.serde; +package org.apache.pinot.query.planner.serde; import com.google.common.base.Preconditions; import java.lang.reflect.Field; @@ -28,48 +28,75 @@ import java.util.Set; import org.apache.pinot.common.proto.Plan; +/** + * Utils to convert automatically from/to object that's implementing {@link ProtoSerializable}. + */ @SuppressWarnings({"rawtypes", "unchecked"}) public class ProtoSerializationUtils { private static final String ENUM_VALUE_KEY = "ENUM_VALUE_KEY"; + private static final String NULL_OBJECT_CLASSNAME = "null"; + private static final Plan.ObjectField NULL_OBJECT_VALUE = Plan.ObjectField.newBuilder() + .setObjectClassName(NULL_OBJECT_CLASSNAME).build(); private ProtoSerializationUtils() { // do not instantiate. } - public static void fromObjectField(Object object, Plan.ObjectField objectField) { + /** + * Reflectively set object's field based on {@link Plan.ObjectField} provided. + * + * @param object the object to be set. + * @param objectField the proto ObjectField from which the object will be set. + */ + public static void setObjectFieldToObject(Object object, Plan.ObjectField objectField) { Map<String, Plan.MemberVariableField> memberVariablesMap = objectField.getMemberVariablesMap(); - try { - for (Map.Entry<String, Plan.MemberVariableField> e : memberVariablesMap.entrySet()) { - Object memberVarObject = constructMemberVariable(e.getValue()); - if (memberVarObject != null) { - Field declaredField = object.getClass().getDeclaredField(e.getKey()); - declaredField.setAccessible(true); - declaredField.set(object, memberVarObject); + for (Map.Entry<String, Plan.MemberVariableField> e : memberVariablesMap.entrySet()) { + try { + Field declaredField = object.getClass().getDeclaredField(e.getKey()); + if (declaredField.isAnnotationPresent(ProtoProperties.class)) { + Object memberVarObject = constructMemberVariable(e.getValue()); + if (memberVarObject != null) { + declaredField.setAccessible(true); + declaredField.set(object, memberVarObject); + } } + } catch (NoSuchFieldException | IllegalAccessException ex) { + throw new IllegalStateException("Unable to set Object " + object.getClass() + " on field " + e.getKey() + + "with object of type: " + objectField.getObjectClassName(), ex); } - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new IllegalStateException("Unable to set Object field for: " + objectField.getObjectClassName(), e); } } - public static Plan.ObjectField toObjectField(Object object) { - Plan.ObjectField.Builder builder = Plan.ObjectField.newBuilder(); - builder.setObjectClassName(object.getClass().getName()); - // special handling for enum - if (object instanceof Enum) { - builder.putMemberVariables(ENUM_VALUE_KEY, serializeMemberVariable(((Enum) object).name())); - } else { - try { - for (Field field : object.getClass().getDeclaredFields()) { - field.setAccessible(true); - Object fieldObject = field.get(object); - builder.putMemberVariables(field.getName(), serializeMemberVariable(fieldObject)); + /** + * Convert object into a proto {@link Plan.ObjectField}. + * + * @param object object to be converted. + * @return the converted proto ObjectField. + */ + public static Plan.ObjectField convertObjectToObjectField(Object object) { + if (object != null) { + Plan.ObjectField.Builder builder = Plan.ObjectField.newBuilder(); + builder.setObjectClassName(object.getClass().getName()); + // special handling for enum + if (object instanceof Enum) { + builder.putMemberVariables(ENUM_VALUE_KEY, serializeMemberVariable(((Enum) object).name())); + } else { + try { + for (Field field : object.getClass().getDeclaredFields()) { + if (field.isAnnotationPresent(ProtoProperties.class)) { + field.setAccessible(true); + Object fieldObject = field.get(object); + builder.putMemberVariables(field.getName(), serializeMemberVariable(fieldObject)); + } + } + } catch (IllegalAccessException e) { + throw new IllegalStateException("Unable to serialize Object: " + object.getClass(), e); } - } catch (IllegalAccessException e) { - throw new IllegalStateException("Unable to serialize Object: " + object.getClass(), e); } + return builder.build(); + } else { + return NULL_OBJECT_VALUE; } - return builder.build(); } // -------------------------------------------------------------------------- @@ -88,6 +115,10 @@ public class ProtoSerializationUtils { return Plan.LiteralField.newBuilder().setLongField(val).build(); } + private static Plan.LiteralField floatField(float val) { + return Plan.LiteralField.newBuilder().setFloatField(val).build(); + } + private static Plan.LiteralField doubleField(double val) { return Plan.LiteralField.newBuilder().setDoubleField(val).build(); } @@ -104,6 +135,8 @@ public class ProtoSerializationUtils { builder.setLiteralField(intField((Integer) fieldObject)); } else if (fieldObject instanceof Long) { builder.setLiteralField(longField((Long) fieldObject)); + } else if (fieldObject instanceof Float) { + builder.setLiteralField(floatField((Float) fieldObject)); } else if (fieldObject instanceof Double) { builder.setLiteralField(doubleField((Double) fieldObject)); } else if (fieldObject instanceof String) { @@ -113,7 +146,7 @@ public class ProtoSerializationUtils { } else if (fieldObject instanceof Map) { builder.setMapField(serializeMapMemberVariable(fieldObject)); } else { - builder.setObjectField(toObjectField(fieldObject)); + builder.setObjectField(convertObjectToObjectField(fieldObject)); } return builder.build(); } @@ -165,6 +198,8 @@ public class ProtoSerializationUtils { return literalField.getIntField(); case LONGFIELD: return literalField.getLongField(); + case FLOATFIELD: + return literalField.getFloatField(); case DOUBLEFIELD: return literalField.getDoubleField(); case STRINGFIELD: @@ -183,7 +218,7 @@ public class ProtoSerializationUtils { return list; } - private static Object constructMap(Plan.MapField mapField) { + private static Map constructMap(Plan.MapField mapField) { Map map = new HashMap(); for (Map.Entry<String, Plan.MemberVariableField> e : mapField.getContentMap().entrySet()) { map.put(e.getKey(), constructMemberVariable(e.getValue())); @@ -192,18 +227,22 @@ public class ProtoSerializationUtils { } private static Object constructObject(Plan.ObjectField objectField) { - try { - Class<?> clazz = Class.forName(objectField.getObjectClassName()); - if (clazz.isEnum()) { - return Enum.valueOf((Class<Enum>) clazz, - objectField.getMemberVariablesOrDefault(ENUM_VALUE_KEY, null).getLiteralField().getStringField()); - } else { - Object obj = clazz.newInstance(); - fromObjectField(obj, objectField); - return obj; + if (!NULL_OBJECT_CLASSNAME.equals(objectField.getObjectClassName())) { + try { + Class<?> clazz = Class.forName(objectField.getObjectClassName()); + if (clazz.isEnum()) { + return Enum.valueOf((Class<Enum>) clazz, + objectField.getMemberVariablesOrDefault(ENUM_VALUE_KEY, null).getLiteralField().getStringField()); + } else { + Object obj = clazz.newInstance(); + setObjectFieldToObject(obj, objectField); + return obj; + } + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalStateException("Unable to create Object of type: " + objectField.getObjectClassName(), e); } - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new IllegalStateException("Unable to create Object of type: " + objectField.getObjectClassName(), e); + } else { + return null; } } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java similarity index 67% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java index ed1fc9ba3e..bdcccea355 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java @@ -16,19 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; import java.util.ArrayList; import java.util.List; +import org.apache.calcite.rel.type.RelDataType; import org.apache.pinot.common.proto.Plan; -import org.apache.pinot.query.planner.nodes.serde.ProtoSerializable; -import org.apache.pinot.query.planner.nodes.serde.ProtoSerializationUtils; +import org.apache.pinot.query.planner.serde.ProtoProperties; +import org.apache.pinot.query.planner.serde.ProtoSerializable; +import org.apache.pinot.query.planner.serde.ProtoSerializationUtils; public abstract class AbstractStageNode implements StageNode, ProtoSerializable { + @ProtoProperties protected final int _stageId; + @ProtoProperties protected final List<StageNode> _inputs; + @ProtoProperties + protected RelDataType _rowType; public AbstractStageNode(int stageId) { _stageId = stageId; @@ -51,12 +57,16 @@ public abstract class AbstractStageNode implements StageNode, ProtoSerializable } @Override - public void setObjectField(Plan.ObjectField objectField) { - ProtoSerializationUtils.fromObjectField(this, objectField); + public void fromObjectField(Plan.ObjectField objectField) { + ProtoSerializationUtils.setObjectFieldToObject(this, objectField); } @Override - public Plan.ObjectField getObjectField() { - return ProtoSerializationUtils.toObjectField(this); + public Plan.ObjectField toObjectField() { + return ProtoSerializationUtils.convertObjectToObjectField(this); + } + + public RelDataType getRowType() { + return _rowType; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java similarity index 56% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java index 0aa8c94ec8..52df4ed5d3 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java @@ -16,23 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.query.planner.serde.ProtoProperties; -public class CalcNode extends AbstractStageNode { - private String _expression; +public class FilterNode extends AbstractStageNode { + @ProtoProperties + private RexExpression _condition; - public CalcNode(int stageId) { + public FilterNode(int stageId) { super(stageId); } - public CalcNode(int stageId, String expression) { - super(stageId); - _expression = expression; + public FilterNode(int currentStageId, RelDataType rowType, RexNode condition) { + super(currentStageId); + super._rowType = rowType; + _condition = RexExpression.toRexExpression(condition); } - public String getExpression() { - return _expression; + public RexExpression getCondition() { + return _condition; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java similarity index 84% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java index bf380639d8..96b6c43a95 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java @@ -16,25 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; import java.util.List; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.partitioning.KeySelector; +import org.apache.pinot.query.planner.serde.ProtoProperties; public class JoinNode extends AbstractStageNode { + @ProtoProperties private JoinRelType _joinRelType; + @ProtoProperties private List<JoinClause> _criteria; public JoinNode(int stageId) { super(stageId); } - public JoinNode(int stageId, JoinRelType joinRelType, List<JoinClause> criteria - ) { + public JoinNode(int stageId, RelDataType rowType, JoinRelType joinRelType, List<JoinClause> criteria) { super(stageId); + super._rowType = rowType; _joinRelType = joinRelType; _criteria = criteria; } @@ -48,7 +52,9 @@ public class JoinNode extends AbstractStageNode { } public static class JoinClause { + @ProtoProperties private KeySelector<Object[], Object> _leftJoinKeySelector; + @ProtoProperties private KeySelector<Object[], Object> _rightJoinKeySelector; public JoinClause() { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java similarity index 79% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java index 8f0c619b79..1c01d8de5c 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java @@ -16,21 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.pinot.query.planner.serde.ProtoProperties; public class MailboxReceiveNode extends AbstractStageNode { + @ProtoProperties private int _senderStageId; + @ProtoProperties private RelDistribution.Type _exchangeType; public MailboxReceiveNode(int stageId) { super(stageId); } - public MailboxReceiveNode(int stageId, int senderStageId, RelDistribution.Type exchangeType) { + public MailboxReceiveNode(int stageId, RelDataType rowType, int senderStageId, RelDistribution.Type exchangeType) { super(stageId); + super._rowType = rowType; _senderStageId = senderStageId; _exchangeType = exchangeType; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java similarity index 56% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java index 9867a16f61..c3f540aa0e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java @@ -16,23 +16,39 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; +import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.pinot.query.planner.partitioning.KeySelector; +import org.apache.pinot.query.planner.serde.ProtoProperties; public class MailboxSendNode extends AbstractStageNode { + @ProtoProperties private int _receiverStageId; + @ProtoProperties private RelDistribution.Type _exchangeType; + @ProtoProperties + private KeySelector<Object[], Object> _partitionKeySelector; public MailboxSendNode(int stageId) { super(stageId); } - public MailboxSendNode(int stageId, int receiverStageId, RelDistribution.Type exchangeType) { + public MailboxSendNode(int stageId, RelDataType rowType, int receiverStageId, RelDistribution.Type exchangeType) { + // When exchangeType is not HASH_DISTRIBUTE, no partitionKeySelector is needed. + this(stageId, rowType, receiverStageId, exchangeType, null); + } + + public MailboxSendNode(int stageId, RelDataType rowType, int receiverStageId, RelDistribution.Type exchangeType, + @Nullable KeySelector<Object[], Object> partitionKeySelector) { super(stageId); + super._rowType = rowType; _receiverStageId = receiverStageId; _exchangeType = exchangeType; + _partitionKeySelector = partitionKeySelector; } public int getReceiverStageId() { @@ -42,4 +58,8 @@ public class MailboxSendNode extends AbstractStageNode { public RelDistribution.Type getExchangeType() { return _exchangeType; } + + public KeySelector<Object[], Object> getPartitionKeySelector() { + return _partitionKeySelector; + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java similarity index 51% copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java copy to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java index 9375a7e986..4ee9be6c0a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java @@ -16,30 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.query.planner.serde.ProtoProperties; -public class TableScanNode extends AbstractStageNode { - private String _tableName; - private List<String> _tableScanColumns; +public class ProjectNode extends AbstractStageNode { + @ProtoProperties + private List<RexExpression> _projects; - public TableScanNode(int stageId) { + public ProjectNode(int stageId) { super(stageId); } - - public TableScanNode(int stageId, String tableName, List<String> tableScanColumns) { - super(stageId); - _tableName = tableName; - _tableScanColumns = tableScanColumns; + public ProjectNode(int currentStageId, RelDataType rowType, List<RexNode> projects) { + super(currentStageId); + super._rowType = rowType; + _projects = projects.stream().map(RexExpression::toRexExpression).collect(Collectors.toList()); } - public String getTableName() { - return _tableName; + public List<RexExpression> getProjects() { + return _projects; } - public List<String> getTableScanColumns() { - return _tableScanColumns; + public RelDataType getRowType() { + return _rowType; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java similarity index 96% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java index cd34aca530..45e65a8c21 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; import java.io.Serializable; import java.util.List; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java similarity index 85% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java index ad7184cdb1..3d34f6effb 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java @@ -16,19 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; import org.apache.pinot.common.proto.Plan; -public final class SerDeUtils { - private SerDeUtils() { +public final class StageNodeSerDeUtils { + private StageNodeSerDeUtils() { // do not instantiate. } public static AbstractStageNode deserializeStageNode(Plan.StageNode protoNode) { AbstractStageNode stageNode = newNodeInstance(protoNode.getNodeName(), protoNode.getStageId()); - stageNode.setObjectField(protoNode.getObjectField()); + stageNode.fromObjectField(protoNode.getObjectField()); for (Plan.StageNode protoChild : protoNode.getInputsList()) { stageNode.addInput(deserializeStageNode(protoChild)); } @@ -39,7 +39,7 @@ public final class SerDeUtils { Plan.StageNode.Builder builder = Plan.StageNode.newBuilder() .setStageId(stageNode.getStageId()) .setNodeName(stageNode.getClass().getSimpleName()) - .setObjectField(stageNode.getObjectField()); + .setObjectField(stageNode.toObjectField()); for (StageNode childNode : stageNode.getInputs()) { builder.addInputs(serializeStageNode((AbstractStageNode) childNode)); } @@ -52,8 +52,10 @@ public final class SerDeUtils { return new TableScanNode(stageId); case "JoinNode": return new JoinNode(stageId); - case "CalcNode": - return new CalcNode(stageId); + case "ProjectNode": + return new ProjectNode(stageId); + case "FilterNode": + return new FilterNode(stageId); case "MailboxSendNode": return new MailboxSendNode(stageId); case "MailboxReceiveNode": diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java similarity index 79% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java index 9375a7e986..9ba36d34f3 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java @@ -16,21 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +package org.apache.pinot.query.planner.stage; import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.pinot.query.planner.serde.ProtoProperties; public class TableScanNode extends AbstractStageNode { + @ProtoProperties private String _tableName; + @ProtoProperties private List<String> _tableScanColumns; public TableScanNode(int stageId) { super(stageId); } - public TableScanNode(int stageId, String tableName, List<String> tableScanColumns) { + public TableScanNode(int stageId, RelDataType rowType, String tableName, List<String> tableScanColumns) { super(stageId); + super._rowType = rowType; _tableName = tableName; _tableScanColumns = tableScanColumns; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java index 2b35613f7a..e7ef083ded 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java @@ -19,6 +19,8 @@ package org.apache.pinot.query.rules; import com.google.common.collect.ImmutableList; +import java.util.Collections; +import java.util.List; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.hep.HepRelVertex; @@ -27,9 +29,13 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Exchange; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.logical.LogicalExchange; import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.pinot.query.planner.hints.PinotRelationalHints; /** @@ -57,12 +63,26 @@ public class PinotExchangeNodeInsertRule extends RelOptRule { @Override public void onMatch(RelOptRuleCall call) { + // TODO: this only works for single equality JOIN. add generic condition parser Join join = call.rel(0); RelNode leftInput = join.getInput(0); RelNode rightInput = join.getInput(1); - RelNode leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON); - RelNode rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED); + RelNode leftExchange; + RelNode rightExchange; + List<RelHint> hints = join.getHints(); + if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) { + int leftOperandIndex = ((RexInputRef) ((RexCall) join.getCondition()).getOperands().get(0)).getIndex(); + int rightOperandIndex = ((RexInputRef) ((RexCall) join.getCondition()).getOperands().get(1)).getIndex() + - join.getLeft().getRowType().getFieldNames().size(); + leftExchange = LogicalExchange.create(leftInput, + RelDistributions.hash(Collections.singletonList(leftOperandIndex))); + rightExchange = LogicalExchange.create(rightInput, + RelDistributions.hash(Collections.singletonList(rightOperandIndex))); + } else { // if (hints.contains(PinotRelationalHints.USE_BROADCAST_JOIN)) + leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON); + rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED); + } RelNode newJoinNode = new LogicalJoin(join.getCluster(), join.getTraitSet(), leftExchange, rightExchange, join.getCondition(), diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java index 1b4e0850ac..63c2fd799f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java @@ -45,6 +45,8 @@ public class PinotQueryRuleSets { CoreRules.FILTER_AGGREGATE_TRANSPOSE, // push filter through set operation CoreRules.FILTER_SET_OP_TRANSPOSE, + // push project through join, + CoreRules.PROJECT_JOIN_TRANSPOSE, // push project through set operation CoreRules.PROJECT_SET_OP_TRANSPOSE, diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java index 60c7cd11af..9f4778743b 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java @@ -19,56 +19,45 @@ package org.apache.pinot.query; import com.google.common.collect.ImmutableList; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.calcite.jdbc.CalciteSchemaBuilder; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelRoot; -import org.apache.calcite.rel.RelWriter; -import org.apache.calcite.rel.externalize.RelXmlWriter; -import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlNode; -import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.transport.ServerInstance; -import org.apache.pinot.query.catalog.PinotCatalog; import org.apache.pinot.query.context.PlannerContext; import org.apache.pinot.query.planner.PlannerUtils; import org.apache.pinot.query.planner.QueryPlan; import org.apache.pinot.query.planner.StageMetadata; -import org.apache.pinot.query.routing.WorkerManager; -import org.apache.pinot.query.type.TypeFactory; -import org.apache.pinot.query.type.TypeSystem; import org.testng.Assert; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class QueryEnvironmentTest { - private QueryEnvironment _queryEnvironment; +public class QueryEnvironmentTest extends QueryEnvironmentTestBase { - @BeforeClass - public void setUp() { - // the port doesn't matter as we are not actually making a server call. - RoutingManager routingManager = QueryEnvironmentTestUtils.getMockRoutingManager(1, 2); - _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()), - CalciteSchemaBuilder.asRootSchema(new PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())), - new WorkerManager("localhost", 3, routingManager)); + @Test(dataProvider = "testQueryParserDataProvider") + public void testQueryParser(String query, String digest) + throws Exception { + PlannerContext plannerContext = new PlannerContext(); + SqlNode sqlNode = _queryEnvironment.parse(query, plannerContext); + _queryEnvironment.validate(sqlNode); + Assert.assertEquals(sqlNode.toString(), digest); } - @Test - public void testSqlStrings() + @Test(dataProvider = "testQueryDataProvider") + public void testQueryToRel(String query) throws Exception { - testQueryParsing("SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0", - "SELECT *\n" + "FROM `a`\n" + "INNER JOIN `b` ON `a`.`col1` = `b`.`col2`\n" + "WHERE `a`.`col3` >= 0"); + try { + QueryPlan queryPlan = _queryEnvironment.planQuery(query); + Assert.assertNotNull(queryPlan); + } catch (RuntimeException e) { + Assert.fail("failed to plan query: " + query, e); + } } @Test - public void testQueryToStages() + public void testQueryAndAssertStageContentForJoin() throws Exception { - PlannerContext plannerContext = new PlannerContext(); String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2"; QueryPlan queryPlan = _queryEnvironment.planQuery(query); Assert.assertEquals(queryPlan.getQueryStageMap().size(), 4); @@ -96,28 +85,19 @@ public class QueryEnvironmentTest { } @Test - public void testQueryToRel() - throws Exception { - PlannerContext plannerContext = new PlannerContext(); - String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0"; - SqlNode parsed = _queryEnvironment.parse(query, plannerContext); - SqlNode validated = _queryEnvironment.validate(parsed); - RelRoot relRoot = _queryEnvironment.toRelation(validated, plannerContext); - RelNode optimized = _queryEnvironment.optimize(relRoot, plannerContext); - - // Assert that relational plan can be written into a ALL-ATTRIBUTE digest. - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - RelWriter planWriter = new RelXmlWriter(pw, SqlExplainLevel.ALL_ATTRIBUTES); - optimized.explain(planWriter); - Assert.assertNotNull(sw.toString()); + public void testQueryProjectFilterPushdownForJoin() { + String query = "SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON a.col1 = b.col2 " + + "WHERE a.col3 >= 0 AND a.col2 IN ('a', 'b') AND b.col3 < 0"; + QueryPlan queryPlan = _queryEnvironment.planQuery(query); + Assert.assertEquals(queryPlan.getQueryStageMap().size(), 4); + Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 4); } - private void testQueryParsing(String query, String digest) - throws Exception { - PlannerContext plannerContext = new PlannerContext(); - SqlNode sqlNode = _queryEnvironment.parse(query, plannerContext); - _queryEnvironment.validate(sqlNode); - Assert.assertEquals(sqlNode.toString(), digest); + @DataProvider(name = "testQueryParserDataProvider") + private Object[][] provideQueriesAndDigest() { + return new Object[][] { + new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0", + "SELECT *\n" + "FROM `a`\n" + "INNER JOIN `b` ON `a`.`col1` = `b`.`col2`\n" + "WHERE `a`.`col3` >= 0"}, + }; } } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java new file mode 100644 index 0000000000..40841d7c72 --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query; + +import org.apache.calcite.jdbc.CalciteSchemaBuilder; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.query.catalog.PinotCatalog; +import org.apache.pinot.query.routing.WorkerManager; +import org.apache.pinot.query.type.TypeFactory; +import org.apache.pinot.query.type.TypeSystem; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; + + +public class QueryEnvironmentTestBase { + protected QueryEnvironment _queryEnvironment; + + @BeforeClass + public void setUp() { + // the port doesn't matter as we are not actually making a server call. + RoutingManager routingManager = QueryEnvironmentTestUtils.getMockRoutingManager(1, 2); + _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()), + CalciteSchemaBuilder.asRootSchema(new PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())), + new WorkerManager("localhost", 3, routingManager)); + } + + @DataProvider(name = "testQueryDataProvider") + protected Object[][] provideQueries() { + return new Object[][] { + new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2"}, + new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0"}, + new Object[]{"SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 " + + "WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"}, + }; + } +} diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java new file mode 100644 index 0000000000..21031cd303 --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.stage; + +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.query.QueryEnvironmentTestBase; +import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.planner.serde.ProtoProperties; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class SerDeUtilsTest extends QueryEnvironmentTestBase { + + @Test(dataProvider = "testQueryDataProvider") + public void testQueryStagePlanSerDe(String query) + throws Exception { + QueryPlan queryPlan = _queryEnvironment.planQuery(query); + for (StageNode stageNode : queryPlan.getQueryStageMap().values()) { + Plan.StageNode serializedStageNode = StageNodeSerDeUtils.serializeStageNode((AbstractStageNode) stageNode); + StageNode deserializedStageNode = StageNodeSerDeUtils.deserializeStageNode(serializedStageNode); + Assert.assertTrue(isObjectEqual(stageNode, deserializedStageNode)); + } + } + + @SuppressWarnings({"rawtypes"}) + private boolean isObjectEqual(Object left, Object right) + throws IllegalAccessException { + Class<?> clazz = left.getClass(); + for (Field field : clazz.getDeclaredFields()) { + if (field.isAnnotationPresent(ProtoProperties.class)) { + field.setAccessible(true); + Object l = field.get(left); + Object r = field.get(right); + if (l instanceof List) { + if (((List) l).size() != ((List) r).size()) { + return false; + } + for (int i = 0; i < ((List) l).size(); i++) { + if (!isObjectEqual(((List) l).get(i), ((List) r).get(i))) { + return false; + } + } + } else if (l instanceof Map) { + if (((Map) l).size() != ((Map) r).size()) { + return false; + } + for (Object key : ((Map) l).keySet()) { + if (!isObjectEqual(((Map) l).get(key), ((Map) r).get(key))) { + return false; + } + } + } else { + if (!(l == null && r == null || l != null && l.equals(r) || isObjectEqual(l, r))) { + return false; + } + } + } + } + return true; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 2968ff9a23..f33cde43c2 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -31,7 +31,7 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.GrpcMailboxService; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.StageMetadata; -import org.apache.pinot.query.planner.nodes.MailboxSendNode; +import org.apache.pinot.query.planner.stage.MailboxSendNode; import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -100,8 +100,8 @@ public class QueryRunner { StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId()); MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(_mailboxService, dataTable, receivingStageMetadata.getServerInstances(), - sendNode.getExchangeType(), _hostname, _port, serverQueryRequest.getRequestId(), - sendNode.getStageId()); + sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostname, _port, + serverQueryRequest.getRequestId(), sendNode.getStageId()); mailboxSendOperator.nextBlock(); } else { _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, executorService); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java index 85c0f108b4..9a66bf86c1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java @@ -30,13 +30,15 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.util.trace.TraceRunnable; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.StageMetadata; -import org.apache.pinot.query.planner.nodes.JoinNode; -import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; -import org.apache.pinot.query.planner.nodes.MailboxSendNode; -import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.planner.stage.FilterNode; +import org.apache.pinot.query.planner.stage.JoinNode; +import org.apache.pinot.query.planner.stage.MailboxReceiveNode; +import org.apache.pinot.query.planner.stage.MailboxSendNode; +import org.apache.pinot.query.planner.stage.ProjectNode; +import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.runtime.blocks.DataTableBlock; import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; -import org.apache.pinot.query.runtime.operator.BroadcastJoinOperator; +import org.apache.pinot.query.runtime.operator.HashJoinOperator; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -98,22 +100,27 @@ public class WorkerQueryExecutor { private BaseOperator<DataTableBlock> getOperator(long requestId, StageNode stageNode, Map<Integer, StageMetadata> metadataMap) { // TODO: optimize this into a framework. (physical planner) - if (stageNode instanceof MailboxSendNode) { - MailboxSendNode sendNode = (MailboxSendNode) stageNode; - BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap); - StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId()); - return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(), - sendNode.getExchangeType(), _hostName, _port, requestId, sendNode.getStageId()); - } else if (stageNode instanceof MailboxReceiveNode) { + if (stageNode instanceof MailboxReceiveNode) { MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode; List<ServerInstance> sendingInstances = metadataMap.get(receiveNode.getSenderStageId()).getServerInstances(); return new MailboxReceiveOperator(_mailboxService, RelDistribution.Type.ANY, sendingInstances, _hostName, _port, requestId, receiveNode.getSenderStageId()); + } else if (stageNode instanceof MailboxSendNode) { + MailboxSendNode sendNode = (MailboxSendNode) stageNode; + BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap); + StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId()); + return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(), + sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostName, _port, requestId, + sendNode.getStageId()); } else if (stageNode instanceof JoinNode) { JoinNode joinNode = (JoinNode) stageNode; BaseOperator<DataTableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap); BaseOperator<DataTableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap); - return new BroadcastJoinOperator(leftOperator, rightOperator, joinNode.getCriteria()); + return new HashJoinOperator(leftOperator, rightOperator, joinNode.getCriteria()); + } else if (stageNode instanceof FilterNode) { + throw new UnsupportedOperationException("Unsupported!"); + } else if (stageNode instanceof ProjectNode) { + throw new UnsupportedOperationException("Unsupported!"); } else { throw new UnsupportedOperationException( String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName())); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BroadcastJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java similarity index 96% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BroadcastJoinOperator.java rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index db5bb5289d..6cf9fddd12 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BroadcastJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -28,8 +28,8 @@ import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; -import org.apache.pinot.query.planner.nodes.JoinNode; import org.apache.pinot.query.planner.partitioning.KeySelector; +import org.apache.pinot.query.planner.stage.JoinNode; import org.apache.pinot.query.runtime.blocks.DataTableBlock; import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; @@ -42,7 +42,7 @@ import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; * * <p>For each of the data block received from the left table, it will generate a joint data block. */ -public class BroadcastJoinOperator extends BaseOperator<DataTableBlock> { +public class HashJoinOperator extends BaseOperator<DataTableBlock> { private static final String OPERATOR_NAME = "BroadcastJoinOperator"; private static final String EXPLAIN_NAME = "BROADCAST_JOIN"; @@ -57,7 +57,7 @@ public class BroadcastJoinOperator extends BaseOperator<DataTableBlock> { private KeySelector<Object[], Object> _leftKeySelector; private KeySelector<Object[], Object> _rightKeySelector; - public BroadcastJoinOperator(BaseOperator<DataTableBlock> leftTableOperator, + public HashJoinOperator(BaseOperator<DataTableBlock> leftTableOperator, BaseOperator<DataTableBlock> rightTableOperator, List<JoinNode.JoinClause> criteria) { // TODO: this assumes right table is broadcast. _leftKeySelector = criteria.get(0).getLeftJoinKeySelector(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 3971dfb325..3dd9c5ff77 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -30,10 +31,12 @@ import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.mailbox.StringMailboxIdentifier; +import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.runtime.blocks.DataTableBlock; import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; import org.slf4j.Logger; @@ -49,10 +52,11 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { private static final String EXPLAIN_NAME = "MAILBOX_SEND"; private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE = ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED, - RelDistribution.Type.BROADCAST_DISTRIBUTED); + RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED); private final List<ServerInstance> _receivingStageInstances; private final RelDistribution.Type _exchangeType; + private final KeySelector<Object[], Object> _keySelector; private final String _serverHostName; private final int _serverPort; private final long _jobId; @@ -63,11 +67,13 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, BaseOperator<DataTableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances, - RelDistribution.Type exchangeType, String hostName, int port, long jobId, int stageId) { + RelDistribution.Type exchangeType, KeySelector<Object[], Object> keySelector, String hostName, int port, + long jobId, int stageId) { _mailboxService = mailboxService; _dataTableBlockBaseOperator = dataTableBlockBaseOperator; _receivingStageInstances = receivingStageInstances; _exchangeType = exchangeType; + _keySelector = keySelector; _serverHostName = hostName; _serverPort = port; _jobId = jobId; @@ -82,12 +88,13 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { * creation of MailboxSendOperator we should not use this API. */ public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataTable dataTable, - List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType, String hostName, int port, - long jobId, int stageId) { + List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType, + KeySelector<Object[], Object> keySelector, String hostName, int port, long jobId, int stageId) { _mailboxService = mailboxService; _dataTable = dataTable; _receivingStageInstances = receivingStageInstances; _exchangeType = exchangeType; + _keySelector = keySelector; _serverHostName = hostName; _serverPort = port; _jobId = jobId; @@ -115,13 +122,16 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { protected DataTableBlock getNextBlock() { DataTable dataTable; DataTableBlock dataTableBlock = null; + boolean isEndOfStream; if (_dataTableBlockBaseOperator != null) { dataTableBlock = _dataTableBlockBaseOperator.nextBlock(); dataTable = dataTableBlock.getDataTable(); + isEndOfStream = DataTableBlockUtils.isEndOfStream(dataTableBlock); } else { dataTable = _dataTable; + isEndOfStream = true; } - boolean isEndOfStream = dataTableBlock == null || DataTableBlockUtils.isEndOfStream(dataTableBlock); + try { switch (_exchangeType) { // TODO: random and singleton distribution should've been selected in planning phase. @@ -142,6 +152,13 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { } break; case HASH_DISTRIBUTED: + // TODO: ensure that server instance list is sorted using same function in sender. + List<DataTable> dataTableList = constructPartitionedDataBlock(dataTable, _keySelector, + _receivingStageInstances.size()); + for (int i = 0; i < _receivingStageInstances.size(); i++) { + sendDataTableBlock(_receivingStageInstances.get(i), dataTableList.get(i), isEndOfStream); + } + break; case RANGE_DISTRIBUTED: case ROUND_ROBIN_DISTRIBUTED: case ANY: @@ -154,6 +171,31 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { return dataTableBlock; } + private static List<DataTable> constructPartitionedDataBlock(DataTable dataTable, + KeySelector<Object[], Object> keySelector, int partitionSize) + throws Exception { + List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize); + for (int i = 0; i < partitionSize; i++) { + temporaryRows.add(new ArrayList<>()); + } + for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) { + Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); + Object key = keySelector.getKey(row); + // TODO: support other partitioning algorithm + temporaryRows.get(hashToIndex(key, partitionSize)).add(row); + } + List<DataTable> dataTableList = new ArrayList<>(partitionSize); + for (int i = 0; i < partitionSize; i++) { + List<Object[]> objects = temporaryRows.get(i); + dataTableList.add(SelectionOperatorUtils.getDataTableFromRows(objects, dataTable.getDataSchema())); + } + return dataTableList; + } + + private static int hashToIndex(Object key, int partitionSize) { + return (key.hashCode()) % partitionSize; + } + private void sendDataTableBlock(ServerInstance serverInstance, DataTable dataTable, boolean isEndOfStream) throws IOException { String mailboxId = toMailboxId(serverInstance); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java index f9ecf7f089..828a969dd0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java @@ -22,7 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.planner.StageMetadata; -import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.planner.stage.StageNode; /** diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java index 358ebb8465..034bf561a2 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java @@ -25,8 +25,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.planner.StageMetadata; -import org.apache.pinot.query.planner.nodes.AbstractStageNode; -import org.apache.pinot.query.planner.nodes.SerDeUtils; +import org.apache.pinot.query.planner.stage.AbstractStageNode; +import org.apache.pinot.query.planner.stage.StageNodeSerDeUtils; import org.apache.pinot.query.routing.WorkerInstance; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -43,7 +43,7 @@ public class QueryPlanSerDeUtils { public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) { DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId()); distributedStagePlan.setServerInstance(stringToInstance(stagePlan.getInstanceId())); - distributedStagePlan.setStageRoot(SerDeUtils.deserializeStageNode(stagePlan.getStageRoot())); + distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot())); Map<Integer, Worker.StageMetadata> metadataMap = stagePlan.getStageMetadataMap(); distributedStagePlan.getMetadataMap().putAll(protoMapToStageMetadataMap(metadataMap)); return distributedStagePlan; @@ -53,7 +53,7 @@ public class QueryPlanSerDeUtils { return Worker.StagePlan.newBuilder() .setStageId(distributedStagePlan.getStageId()) .setInstanceId(instanceToString(distributedStagePlan.getServerInstance())) - .setStageRoot(SerDeUtils.serializeStageNode((AbstractStageNode) distributedStagePlan.getStageRoot())) + .setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractStageNode) distributedStagePlan.getStageRoot())) .putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java index 306faa2598..5f17e83305 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java @@ -29,11 +29,12 @@ import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.request.QuerySource; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.request.ServerQueryRequest; -import org.apache.pinot.query.planner.nodes.CalcNode; -import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; -import org.apache.pinot.query.planner.nodes.MailboxSendNode; -import org.apache.pinot.query.planner.nodes.StageNode; -import org.apache.pinot.query.planner.nodes.TableScanNode; +import org.apache.pinot.query.parser.CalciteRexExpressionParser; +import org.apache.pinot.query.planner.stage.FilterNode; +import org.apache.pinot.query.planner.stage.MailboxSendNode; +import org.apache.pinot.query.planner.stage.ProjectNode; +import org.apache.pinot.query.planner.stage.StageNode; +import org.apache.pinot.query.planner.stage.TableScanNode; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -88,22 +89,28 @@ public class ServerRequestUtils { } private static void walkStageTree(StageNode node, PinotQuery pinotQuery) { - if (node instanceof CalcNode) { - // TODO: add conversion for CalcNode, specifically filter/alias/... - } else if (node instanceof TableScanNode) { + // this walkStageTree should only be a sequential walk. + for (StageNode child : node.getInputs()) { + walkStageTree(child, pinotQuery); + } + if (node instanceof TableScanNode) { TableScanNode tableScanNode = (TableScanNode) node; DataSource dataSource = new DataSource(); dataSource.setTableName(tableScanNode.getTableName()); pinotQuery.setDataSource(dataSource); pinotQuery.setSelectList(tableScanNode.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression) .collect(Collectors.toList())); - } else if (node instanceof MailboxSendNode || node instanceof MailboxReceiveNode) { - // ignore for now. continue to child. + } else if (node instanceof FilterNode) { + pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression( + ((FilterNode) node).getCondition(), pinotQuery)); + } else if (node instanceof ProjectNode) { + pinotQuery.setSelectList(CalciteRexExpressionParser.convertSelectList( + ((ProjectNode) node).getProjects(), pinotQuery)); + } else if (node instanceof MailboxSendNode) { + // TODO: MailboxSendNode should be the root of the leaf stage. but ignore for now since it is handle seperately + // in QueryRunner as a single step sender. } else { throw new UnsupportedOperationException("Unsupported logical plan node: " + node); } - for (StageNode child : node.getInputs()) { - walkStageTree(child, pinotQuery); - } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index 3200b317bb..7ea69b8d21 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -32,7 +32,7 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.QueryPlan; import org.apache.pinot.query.planner.StageMetadata; -import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; +import org.apache.pinot.query.planner.stage.MailboxReceiveNode; import org.apache.pinot.query.runtime.blocks.DataTableBlock; import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index 9de2db834f..2316195bce 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -36,7 +36,7 @@ import org.apache.pinot.query.QueryEnvironmentTestUtils; import org.apache.pinot.query.QueryServerEnclosure; import org.apache.pinot.query.mailbox.GrpcMailboxService; import org.apache.pinot.query.planner.QueryPlan; -import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; +import org.apache.pinot.query.planner.stage.MailboxReceiveNode; import org.apache.pinot.query.routing.WorkerInstance; import org.apache.pinot.query.runtime.blocks.DataTableBlock; import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; @@ -47,6 +47,7 @@ import org.apache.pinot.query.service.QueryDispatcher; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.apache.pinot.core.query.selection.SelectionOperatorUtils.extractRowFromDataTable; @@ -101,73 +102,9 @@ public class QueryRunnerTest { _mailboxService.shutdown(); } - @Test - public void testRunningTableScanOnlyQuery() - throws Exception { - QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM b"); - int stageRoodId = QueryEnvironmentTestUtils.getTestStageByServerCount(queryPlan, 1); - Map<String, String> requestMetadataMap = - ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong())); - - ServerInstance serverInstance = queryPlan.getStageMetadataMap().get(stageRoodId).getServerInstances().get(0); - DistributedStagePlan distributedStagePlan = - QueryDispatcher.constructDistributedStagePlan(queryPlan, stageRoodId, serverInstance); - - MailboxReceiveOperator mailboxReceiveOperator = - createReduceStageOperator(queryPlan.getStageMetadataMap().get(stageRoodId).getServerInstances(), - Long.parseLong(requestMetadataMap.get("REQUEST_ID")), stageRoodId, _reducerGrpcPort); - - // execute this single stage. - _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap); - - DataTableBlock dataTableBlock; - // get the block back and it should have 5 rows - dataTableBlock = mailboxReceiveOperator.nextBlock(); - Assert.assertEquals(dataTableBlock.getDataTable().getNumberOfRows(), 5); - // next block should be null as all servers finished sending. - dataTableBlock = mailboxReceiveOperator.nextBlock(); - Assert.assertTrue(DataTableBlockUtils.isEndOfStream(dataTableBlock)); - } - - @Test - public void testRunningTableScanMultipleServer() - throws Exception { - QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a"); - int stageRoodId = QueryEnvironmentTestUtils.getTestStageByServerCount(queryPlan, 2); - Map<String, String> requestMetadataMap = - ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong())); - - for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageRoodId).getServerInstances()) { - DistributedStagePlan distributedStagePlan = - QueryDispatcher.constructDistributedStagePlan(queryPlan, stageRoodId, serverInstance); - - // execute this single stage. - _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap); - } - - MailboxReceiveOperator mailboxReceiveOperator = - createReduceStageOperator(queryPlan.getStageMetadataMap().get(stageRoodId).getServerInstances(), - Long.parseLong(requestMetadataMap.get("REQUEST_ID")), stageRoodId, _reducerGrpcPort); - - int count = 0; - int rowCount = 0; - DataTableBlock dataTableBlock; - while (count < 2) { // we have 2 servers sending data. - dataTableBlock = mailboxReceiveOperator.nextBlock(); - rowCount += dataTableBlock.getDataTable().getNumberOfRows(); - count++; - } - // assert that all table A segments returned successfully. - Assert.assertEquals(rowCount, 15); - // assert that the next block is null (e.g. finished receiving). - dataTableBlock = mailboxReceiveOperator.nextBlock(); - Assert.assertTrue(DataTableBlockUtils.isEndOfStream(dataTableBlock)); - } - - @Test - public void testJoin() - throws Exception { - QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a JOIN b on a.col1 = b.col2"); + @Test(dataProvider = "testDataWithSqlToFinalRowCount") + public void testSqlWithFinalRowCountChecker(String sql, int expectedRowCount) { + QueryPlan queryPlan = _queryEnvironment.planQuery(sql); Map<String, String> requestMetadataMap = ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong())); MailboxReceiveOperator mailboxReceiveOperator = null; @@ -187,85 +124,52 @@ public class QueryRunnerTest { } Preconditions.checkNotNull(mailboxReceiveOperator); - int count = 0; - int rowCount = 0; - List<Object[]> resultRows = new ArrayList<>(); - DataTableBlock dataTableBlock; - while (count < 2) { // we have 2 servers sending data. - dataTableBlock = mailboxReceiveOperator.nextBlock(); - if (dataTableBlock.getDataTable() != null) { - DataTable dataTable = dataTableBlock.getDataTable(); - int numRows = dataTable.getNumberOfRows(); - for (int rowId = 0; rowId < numRows; rowId++) { - resultRows.add(extractRowFromDataTable(dataTable, rowId)); - } - rowCount += numRows; - } - count++; - } - - // Assert that each of the 5 categories from left table is joined with right table. - // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), - // thus the final JOIN result will be 15 x 1 = 15. - Assert.assertEquals(rowCount, 15); - - // assert that the next block is null (e.g. finished receiving). - dataTableBlock = mailboxReceiveOperator.nextBlock(); - Assert.assertTrue(DataTableBlockUtils.isEndOfStream(dataTableBlock)); + List<Object[]> resultRows = reduceMailboxReceive(mailboxReceiveOperator); + Assert.assertEquals(resultRows.size(), expectedRowCount); } - @Test - public void testMultipleJoin() - throws Exception { - QueryPlan queryPlan = - _queryEnvironment.planQuery("SELECT * FROM a JOIN b ON a.col1 = b.col2 " + "JOIN c ON a.col3 = c.col3"); - Map<String, String> requestMetadataMap = - ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong())); - MailboxReceiveOperator mailboxReceiveOperator = null; - for (int stageId : queryPlan.getStageMetadataMap().keySet()) { - if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) { - MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(stageId); - mailboxReceiveOperator = createReduceStageOperator( - queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), - Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), _reducerGrpcPort); - } else { - for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) { - DistributedStagePlan distributedStagePlan = - QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId, serverInstance); - _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap); - } - } - } - Preconditions.checkNotNull(mailboxReceiveOperator); + @DataProvider(name = "testDataWithSqlToFinalRowCount") + private Object[][] provideTestSqlAndRowCount() { + return new Object[][] { + new Object[]{"SELECT * FROM b", 5}, + new Object[]{"SELECT * FROM a", 15}, + + // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), + // thus the final JOIN result will be 15 x 1 = 15. + // Next join with table C which has (5 on server1 and 10 on server2), since data is identical. each of the row + // of the A JOIN B will have identical value of col3 as table C.col3 has. Since the values are cycling between + // (1, 2, 42, 1, 2). we will have 6 1s, 6 2s, and 3 42s, total result count will be 36 + 36 + 9 = 81 + new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 JOIN c ON a.col3 = c.col3", 81}, + + // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), + // thus the final JOIN result will be 15 x 1 = 15. + new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col2", 15}, + + // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), + // but only 1 out of 5 rows from table A will be selected out; and all in table B will be selected. + // thus the final JOIN result will be 1 x 3 x 1 = 3. + new Object[]{"SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON a.col1 = b.col2 " + + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 3}, + }; + } - int count = 0; - int rowCount = 0; + protected static List<Object[]> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) { List<Object[]> resultRows = new ArrayList<>(); DataTableBlock dataTableBlock; - while (count < 2) { // we have 2 servers sending data. + while (true) { dataTableBlock = mailboxReceiveOperator.nextBlock(); + if (DataTableBlockUtils.isEndOfStream(dataTableBlock)) { + break; + } if (dataTableBlock.getDataTable() != null) { DataTable dataTable = dataTableBlock.getDataTable(); int numRows = dataTable.getNumberOfRows(); for (int rowId = 0; rowId < numRows; rowId++) { resultRows.add(extractRowFromDataTable(dataTable, rowId)); } - rowCount += numRows; } - count++; } - - // Assert that each of the 5 categories from left table is joined with right table. - // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), - // thus the final JOIN result will be 15 x 1 = 15. - // Next join with table C which has (5 on server1 and 10 on server2), since data is identical. each of the row of - // the A JOIN B will have identical value of col3 as table C.col3 has. Since the values are cycling between - // (1, 2, 42, 1, 2). we will have 6 1s, 6 2s, and 3 42s, total result count will be 36 + 36 + 9 = 81 - Assert.assertEquals(rowCount, 81); - - // assert that the next block is null (e.g. finished receiving). - dataTableBlock = mailboxReceiveOperator.nextBlock(); - Assert.assertTrue(DataTableBlockUtils.isEndOfStream(dataTableBlock)); + return resultRows; } protected MailboxReceiveOperator createReduceStageOperator(List<ServerInstance> sendingInstances, long jobId, diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java index eeed6d20d9..735c54dd0c 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java @@ -24,6 +24,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ExecutorService; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; @@ -32,7 +33,7 @@ import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryEnvironmentTestUtils; import org.apache.pinot.query.planner.QueryPlan; import org.apache.pinot.query.planner.StageMetadata; -import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.routing.WorkerInstance; import org.apache.pinot.query.runtime.QueryRunner; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; @@ -41,12 +42,14 @@ import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; public class QueryServerTest { + private static final Random RANDOM_REQUEST_ID_GEN = new Random(); private static final int QUERY_SERVER_COUNT = 2; private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); private final Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>(); @@ -84,13 +87,14 @@ public class QueryServerTest { } @SuppressWarnings("unchecked") - @Test - public void testWorkerAcceptsWorkerRequestCorrect() + @Test(dataProvider = "testDataWithSqlToCompiledAsWorkerRequest") + public void testWorkerAcceptsWorkerRequestCorrect(String sql) throws Exception { - QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a JOIN b ON a.col1 = b.col2"); + QueryPlan queryPlan = _queryEnvironment.planQuery(sql); for (int stageId : queryPlan.getStageMetadataMap().keySet()) { if (stageId > 0) { // we do not test reduce stage. + // only get one worker request out. Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, stageId); // submit the request for testing. @@ -99,24 +103,39 @@ public class QueryServerTest { StageMetadata stageMetadata = queryPlan.getStageMetadataMap().get(stageId); // ensure mock query runner received correctly deserialized payload. + QueryRunner mockRunner = _queryRunnerMap.get( + Integer.parseInt(queryRequest.getMetadataOrThrow("SERVER_INSTANCE_PORT"))); + String requestIdStr = queryRequest.getMetadataOrThrow("REQUEST_ID"); + // since submitRequest is async, we need to wait for the mockRunner to receive the query payload. - QueryRunner mockRunner = _queryRunnerMap.get(stageMetadata.getServerInstances().get(0).getPort()); TestUtils.waitForCondition(aVoid -> { try { Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> { StageNode stageNode = queryPlan.getQueryStageMap().get(stageId); - return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot()) && isMetadataMapsEqual( - stageMetadata, distributedStagePlan.getMetadataMap().get(stageId)); - }), any(ExecutorService.class), any(Map.class)); + return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot()) + && isMetadataMapsEqual(stageMetadata, distributedStagePlan.getMetadataMap().get(stageId)); + }), any(ExecutorService.class), Mockito.argThat(requestMetadataMap -> + requestIdStr.equals(requestMetadataMap.get("REQUEST_ID")))); return true; } catch (Throwable t) { return false; } - }, 1000L, "Error verifying mock QueryRunner intercepted query payload!"); + }, 10000L, "Error verifying mock QueryRunner intercepted query payload!"); } } } + @DataProvider(name = "testDataWithSqlToCompiledAsWorkerRequest") + private Object[][] provideTestSqlToCompiledToWorkerRequest() { + return new Object[][] { + new Object[]{"SELECT * FROM b"}, + new Object[]{"SELECT * FROM a"}, + new Object[]{"SELECT * FROM a JOIN b ON a.col3 = b.col3"}, + new Object[]{"SELECT a.col1, a.ts, c.col2, c.col3 FROM a JOIN c ON a.col1 = c.col2 " + + " WHERE (a.col3 >= 0 OR a.col2 = 'foo') AND c.col3 >= 0"}, + }; + } + private static boolean isMetadataMapsEqual(StageMetadata left, StageMetadata right) { return left.getServerInstances().equals(right.getServerInstances()) && left.getServerInstanceToSegmentsMap().equals(right.getServerInstanceToSegmentsMap()) @@ -124,6 +143,8 @@ public class QueryServerTest { } private static boolean isStageNodesEqual(StageNode left, StageNode right) { + // This only checks the stage tree structure is correct. because the input/stageId fields are not + // part of the generic proto ser/de; which is tested in query planner. if (left.getStageId() != right.getStageId() || left.getClass() != right.getClass() || left.getInputs().size() != right.getInputs().size()) { return false; @@ -153,6 +174,7 @@ public class QueryServerTest { return Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize( QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId, serverInstance))) + .putMetadata("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong())) .putMetadata("SERVER_INSTANCE_HOST", serverInstance.getHostname()) .putMetadata("SERVER_INSTANCE_PORT", String.valueOf(serverInstance.getPort())).build(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org