This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d4a64e3853 add project/transform operator to multistage engine (#8967) d4a64e3853 is described below commit d4a64e3853102b220cde905f79eaede0054bc43b Author: Rong Rong <walterddr.walter...@gmail.com> AuthorDate: Thu Jun 30 21:26:35 2022 -0700 add project/transform operator to multistage engine (#8967) * initial to add transform operator - only reference is suppoted, no call yet - should work with any transform * also add transform operator with function call * fix function resolution Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/query/planner/logical/StagePlanner.java | 1 + .../runtime/executor/WorkerQueryExecutor.java | 5 +- .../query/runtime/operator/AggregateOperator.java | 3 + .../query/runtime/operator/OperatorUtils.java | 67 ++++++++ .../query/runtime/operator/TransformOperator.java | 185 +++++++++++++++++++++ .../pinot/query/runtime/QueryRunnerTest.java | 19 ++- 6 files changed, 278 insertions(+), 2 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java index 02c2e7fb28..cf0b218708 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java @@ -95,6 +95,7 @@ public class StagePlanner { } // non-threadsafe + // TODO: add dataSchema (extracted from RelNode schema) to the StageNode. private StageNode walkRelPlan(RelNode node, int currentStageId) { if (isExchangeNode(node)) { // 1. exchangeNode always have only one input, get its input converted as a new stage root. diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java index 379d7bdbd6..009ebcdc3e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java @@ -43,6 +43,7 @@ import org.apache.pinot.query.runtime.operator.AggregateOperator; import org.apache.pinot.query.runtime.operator.HashJoinOperator; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; +import org.apache.pinot.query.runtime.operator.TransformOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; @@ -127,7 +128,9 @@ public class WorkerQueryExecutor { } else if (stageNode instanceof FilterNode) { throw new UnsupportedOperationException("Unsupported!"); } else if (stageNode instanceof ProjectNode) { - throw new UnsupportedOperationException("Unsupported!"); + ProjectNode projectNode = (ProjectNode) stageNode; + return new TransformOperator(getOperator(requestId, projectNode.getInputs().get(0), metadataMap), + projectNode.getProjects(), projectNode.getInputs().get(0).getDataSchema()); } else { throw new UnsupportedOperationException( String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName())); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index 95a5dcd03f..8e2c7ef77a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -159,6 +159,7 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> { switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) { case "$SUM": case "$SUM0": + case "SUM": return new SumAggregationFunction( ExpressionContext.forIdentifier( ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString())); @@ -167,11 +168,13 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> { return new CountAggregationFunction(); case "$MIN": case "$MIN0": + case "MIN": return new MinAggregationFunction( ExpressionContext.forIdentifier( ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString())); case "$MAX": case "$MAX0": + case "MAX": return new MaxAggregationFunction( ExpressionContext.forIdentifier( ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString())); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java new file mode 100644 index 0000000000..10974f4002 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import java.util.HashMap; +import java.util.Map; + + +public class OperatorUtils { + + private static final Map<String, String> OPERATOR_TOKEN_MAPPING = new HashMap<>(); + + static { + OPERATOR_TOKEN_MAPPING.put("=", "EQ"); + OPERATOR_TOKEN_MAPPING.put(">", "GT"); + OPERATOR_TOKEN_MAPPING.put("<", "LT"); + OPERATOR_TOKEN_MAPPING.put("?", "HOOK"); + OPERATOR_TOKEN_MAPPING.put(":", "COLON"); + OPERATOR_TOKEN_MAPPING.put("<=", "LE"); + OPERATOR_TOKEN_MAPPING.put(">=", "GE"); + OPERATOR_TOKEN_MAPPING.put("<>", "NE"); + OPERATOR_TOKEN_MAPPING.put("!=", "NE2"); + OPERATOR_TOKEN_MAPPING.put("+", "PLUS"); + OPERATOR_TOKEN_MAPPING.put("-", "MINUS"); + OPERATOR_TOKEN_MAPPING.put("*", "STAR"); + OPERATOR_TOKEN_MAPPING.put("/", "DIVIDE"); + OPERATOR_TOKEN_MAPPING.put("%", "PERCENT_REMAINDER"); + OPERATOR_TOKEN_MAPPING.put("||", "CONCAT"); + OPERATOR_TOKEN_MAPPING.put("=>", "NAMED_ARGUMENT_ASSIGNMENT"); + OPERATOR_TOKEN_MAPPING.put("..", "DOUBLE_PERIOD"); + OPERATOR_TOKEN_MAPPING.put("'", "QUOTE"); + OPERATOR_TOKEN_MAPPING.put("\"", "DOUBLE_QUOTE"); + OPERATOR_TOKEN_MAPPING.put("|", "VERTICAL_BAR"); + OPERATOR_TOKEN_MAPPING.put("^", "CARET"); + OPERATOR_TOKEN_MAPPING.put("$", "DOLLAR"); + } + + private OperatorUtils() { + // do not instantiate. + } + + /** + * Canonicalize function name since Logical plan uses Parser.jj extracted tokens. + * @param functionName input Function name + * @return Canonicalize form of the input function name + */ + public static String canonicalizeFunctionName(String functionName) { + functionName = OPERATOR_TOKEN_MAPPING.getOrDefault(functionName, functionName); + return functionName; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java new file mode 100644 index 0000000000..6744e35d99 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.FunctionInfo; +import org.apache.pinot.common.function.FunctionInvoker; +import org.apache.pinot.common.function.FunctionRegistry; +import org.apache.pinot.common.function.FunctionUtils; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.common.datablock.BaseDataBlock; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; + + +/** + * This basic {@code TransformOperator} implement basic transformations. + */ +public class TransformOperator extends BaseOperator<TransferableBlock> { + private static final String EXPLAIN_NAME = "TRANSFORM"; + private final BaseOperator<TransferableBlock> _upstreamOperator; + private final List<TransformOperands> _transformOperandsList; + private final int _resultColumnSize; + private final DataSchema _resultSchema; + + public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator, List<RexExpression> transforms, + DataSchema upstreamDataSchema) { + _upstreamOperator = upstreamOperator; + _resultColumnSize = transforms.size(); + _transformOperandsList = new ArrayList<>(_resultColumnSize); + for (RexExpression rexExpression : transforms) { + _transformOperandsList.add(TransformOperands.toFunctionOperands(rexExpression, upstreamDataSchema)); + } + String[] columnNames = new String[_resultColumnSize]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[_resultColumnSize]; + for (int i = 0; i < _resultColumnSize; i++) { + columnNames[i] = _transformOperandsList.get(i).getResultName(); + columnDataTypes[i] = _transformOperandsList.get(i).getResultType(); + } + _resultSchema = new DataSchema(columnNames, columnDataTypes); + } + + @Override + public List<Operator> getChildOperators() { + // WorkerExecutor doesn't use getChildOperators, returns null here. + return null; + } + + @Nullable + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + protected TransferableBlock getNextBlock() { + try { + return transform(_upstreamOperator.nextBlock()); + } catch (Exception e) { + return TransferableBlockUtils.getErrorTransferableBlock(e); + } + } + + private TransferableBlock transform(TransferableBlock block) + throws Exception { + if (TransferableBlockUtils.isEndOfStream(block)) { + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + } + List<Object[]> resultRows = new ArrayList<>(); + List<Object[]> container = block.getContainer(); + for (Object[] row : container) { + Object[] resultRow = new Object[_resultColumnSize]; + for (int i = 0; i < _resultColumnSize; i++) { + resultRow[i] = _transformOperandsList.get(i).apply(row); + } + resultRows.add(resultRow); + } + return new TransferableBlock(resultRows, _resultSchema, BaseDataBlock.Type.ROW); + } + + private static abstract class TransformOperands { + protected String _resultName; + protected DataSchema.ColumnDataType _resultType; + + public static TransformOperands toFunctionOperands(RexExpression rexExpression, DataSchema dataSchema) { + if (rexExpression instanceof RexExpression.InputRef) { + return new ReferenceOperands((RexExpression.InputRef) rexExpression, dataSchema); + } else if (rexExpression instanceof RexExpression.FunctionCall) { + return new FunctionOperands((RexExpression.FunctionCall) rexExpression, dataSchema); + } else { + throw new UnsupportedOperationException("Unsupported RexExpression: " + rexExpression); + } + } + + public String getResultName() { + return _resultName; + } + + public DataSchema.ColumnDataType getResultType() { + return _resultType; + } + + public abstract Object apply(Object[] row); + } + + private static class FunctionOperands extends TransformOperands { + private final List<TransformOperands> _childOperandList; + private final FunctionInvoker _functionInvoker; + private final Object[] _reusableOperandHolder; + + public FunctionOperands(RexExpression.FunctionCall functionCall, DataSchema dataSchema) { + // iteratively resolve child operands. + List<RexExpression> operandExpressions = functionCall.getFunctionOperands(); + _childOperandList = new ArrayList<>(operandExpressions.size()); + for (RexExpression childRexExpression : operandExpressions) { + _childOperandList.add(toFunctionOperands(childRexExpression, dataSchema)); + } + FunctionInfo functionInfo = FunctionRegistry.getFunctionInfo( + OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName()), operandExpressions.size()); + Preconditions.checkNotNull(functionInfo, "Cannot find function with Name: " + + functionCall.getFunctionName()); + _functionInvoker = new FunctionInvoker(functionInfo); + _resultName = computeColumnName(functionCall.getFunctionName(), _childOperandList); + _resultType = FunctionUtils.getColumnDataType(_functionInvoker.getResultClass()); + _reusableOperandHolder = new Object[operandExpressions.size()]; + } + + @Override + public Object apply(Object[] row) { + for (int i = 0; i < _childOperandList.size(); i++) { + _reusableOperandHolder[i] = _childOperandList.get(i).apply(row); + } + return _functionInvoker.invoke(_reusableOperandHolder); + } + + private static String computeColumnName(String functionName, List<TransformOperands> childOperands) { + StringBuilder sb = new StringBuilder(); + sb.append(functionName); + sb.append("("); + for (TransformOperands operands : childOperands) { + sb.append(operands.getResultName()); + sb.append(","); + } + sb.append(")"); + return sb.toString(); + } + } + + private static class ReferenceOperands extends TransformOperands { + private final int _refIndex; + + public ReferenceOperands(RexExpression.InputRef inputRef, DataSchema dataSchema) { + _refIndex = inputRef.getIndex(); + _resultType = dataSchema.getColumnDataType(_refIndex); + _resultName = dataSchema.getColumnName(_refIndex); + } + + @Override + public Object apply(Object[] row) { + return row[_refIndex]; + } + } +} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index b409a4b063..ae75dd158b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -182,8 +182,25 @@ public class QueryRunnerTest { // Aggregation with multiple group key new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1, a.col2", 5}, - // Aggregation without group by + // Aggregation without GROUP BY new Object[]{"SELECT COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'", 1}, + + // project in intermediate stage + // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), + // col1 on both are "foo", "bar", "alice", "bob", "charlie" + // col2 on both are "foo", "bar", "alice", "foo", "bar", + // filtered at : ^ ^ + // thus the final JOIN result will have 6 rows: 3 "foo" <-> "foo"; and 3 "bob" <-> "bob" + new Object[]{"SELECT a.col1, a.col2, a.ts, b.col1, b.col3 FROM a JOIN b ON a.col1 = b.col2 " + + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 6}, + + // Making transform after JOIN, number of rows should be the same as JOIN result. + new Object[]{"SELECT a.col1, a.ts, a.col3 - b.col3 FROM a JOIN b ON a.col1 = b.col2 " + + " WHERE a.col3 >= 0 AND b.col3 >= 0", 15}, + + // Making transform after GROUP-BY, number of rows should be the same as GROUP-BY result. + new Object[]{"SELECT a.col1, a.col2, SUM(a.col3) - MIN(a.col3) FROM a" + + " WHERE a.col3 >= 0 GROUP BY a.col1, a.col2", 5}, }; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org