This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2987f7ad08 [multistage] [testing] Add transform operator unit test (#9791) 2987f7ad08 is described below commit 2987f7ad082cf8f6abd7a9f82d4518734b3738d3 Author: Yao Liu <y...@startree.ai> AuthorDate: Tue Nov 15 11:39:49 2022 -0800 [multistage] [testing] Add transform operator unit test (#9791) --- .../pinot/common/function/FunctionRegistry.java | 2 +- .../query/runtime/operator/TransformOperator.java | 16 +- .../runtime/operator/operands/FunctionOperand.java | 5 +- .../runtime/operator/HashJoinOperatorTest.java | 18 +- .../runtime/operator/TransformOperatorTest.java | 246 +++++++++++++++++++++ 5 files changed, 275 insertions(+), 12 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java index 4d786b2ed5..82a16720e8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java @@ -50,7 +50,7 @@ public class FunctionRegistry { // TODO: consolidate the following 2 // This FUNCTION_INFO_MAP is used by Pinot server to look up function by # of arguments private static final Map<String, Map<Integer, FunctionInfo>> FUNCTION_INFO_MAP = new HashMap<>(); - // This FUNCTION_MAP is used by Calcite function catalog tolook up function by function signature. + // This FUNCTION_MAP is used by Calcite function catalog to look up function by function signature. private static final NameMultimap<Function> FUNCTION_MAP = new NameMultimap<>(); /** diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java index ff74602d34..e00d4a9eed 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.runtime.operator; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; @@ -33,24 +34,35 @@ import org.apache.pinot.query.runtime.operator.operands.TransformOperand; /** * This basic {@code TransformOperator} implement basic transformations. + * + * This operator performs three kinds of transform + * - InputRef transform, which reads from certain input column based on column index + * - Literal transform, which outputs literal value + * - Function transform, which runs a function on function operands. Function operands and be any of 3 the transform. + * Note: Function transform only runs functions from v1 engine scalar function factory, which only does argument count + * and canonicalized function name matching (lower case). */ public class TransformOperator extends BaseOperator<TransferableBlock> { private static final String EXPLAIN_NAME = "TRANSFORM"; private final Operator<TransferableBlock> _upstreamOperator; private final List<TransformOperand> _transformOperandsList; private final int _resultColumnSize; + // TODO: Check type matching between resultSchema and the actual result. private final DataSchema _resultSchema; private TransferableBlock _upstreamErrorBlock; - public TransformOperator(Operator<TransferableBlock> upstreamOperator, DataSchema dataSchema, + public TransformOperator(Operator<TransferableBlock> upstreamOperator, DataSchema resultSchema, List<RexExpression> transforms, DataSchema upstreamDataSchema) { + Preconditions.checkState(!transforms.isEmpty(), "transform operand should not be empty."); + Preconditions.checkState(resultSchema.size() == transforms.size(), + "result schema size:" + resultSchema.size() + " doesn't match transform operand size:" + transforms.size()); _upstreamOperator = upstreamOperator; _resultColumnSize = transforms.size(); _transformOperandsList = new ArrayList<>(_resultColumnSize); for (RexExpression rexExpression : transforms) { _transformOperandsList.add(TransformOperand.toTransformOperand(rexExpression, upstreamDataSchema)); } - _resultSchema = dataSchema; + _resultSchema = resultSchema; } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java index 97fae1d57b..029f5ddaad 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java @@ -29,7 +29,9 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.operator.OperatorUtils; - +/* + * FunctionOperands are generated from {@link RexExpression}s. + */ public class FunctionOperand extends TransformOperand { private final List<TransformOperand> _childOperandList; private final FunctionInvoker _functionInvoker; @@ -48,6 +50,7 @@ public class FunctionOperand extends TransformOperand { Preconditions.checkNotNull(functionInfo, "Cannot find function with Name: " + functionCall.getFunctionName()); _functionInvoker = new FunctionInvoker(functionInfo); _resultName = computeColumnName(functionCall.getFunctionName(), _childOperandList); + // TODO: Check type match between functionCall's data type and result type. _resultType = FunctionUtils.getColumnDataType(_functionInvoker.getResultClass()); _reusableOperandHolder = new Object[operandExpressions.size()]; } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index 65bb2192c2..0cf912c279 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -32,7 +32,6 @@ import org.testng.Assert; import org.testng.annotations.Test; - public class HashJoinOperatorTest { private static JoinNode.JoinKeys getJoinKeys(List<Integer> leftIdx, List<Integer> rightIdx) { FieldSelectionKeySelector leftSelect = new FieldSelectionKeySelector(leftIdx); @@ -49,8 +48,9 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING }); - HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema, - getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER); + HashJoinOperator join = + new HashJoinOperator(leftOperator, rightOperator, resultSchema, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), + joinClauses, JoinRelType.INNER); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -72,14 +72,15 @@ public class HashJoinOperatorTest { public void testInnerJoin() { BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_2); - List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING }); - HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema, - getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER); + HashJoinOperator join = + new HashJoinOperator(leftOperator, rightOperator, resultSchema, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), + joinClauses, JoinRelType.INNER); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -103,8 +104,9 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING }); - HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema, - getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.LEFT); + HashJoinOperator join = + new HashJoinOperator(leftOperator, rightOperator, resultSchema, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), + joinClauses, JoinRelType.LEFT); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java new file mode 100644 index 0000000000..868ec3b9b0 --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.apache.calcite.sql.SqlKind.MINUS; +import static org.apache.calcite.sql.SqlKind.PLUS; + + +public class TransformOperatorTest { + private AutoCloseable _mocks; + + @Mock + private Operator<TransferableBlock> _upstreamOp; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mocks.close(); + } + + @Test + public void shouldHandleRefTransform() { + DataSchema upStreamSchema = new DataSchema(new String[]{"intCol", "strCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); + Mockito.when(_upstreamOp.nextBlock()) + .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{1, "a"}, new Object[]{2, "b"})); + // Output column value + RexExpression.InputRef ref0 = new RexExpression.InputRef(0); + RexExpression.InputRef ref1 = new RexExpression.InputRef(1); + TransformOperator op = + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema); + TransferableBlock result = op.nextBlock(); + + Assert.assertTrue(!result.isErrorBlock()); + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "a"}, new Object[]{2, "b"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + } + + @Test + public void shouldHandleLiteralTransform() { + DataSchema upStreamSchema = new DataSchema(new String[]{"boolCol", "strCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.STRING + }); + DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "strCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.STRING}); + Mockito.when(_upstreamOp.nextBlock()) + .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{1, "a"}, new Object[]{2, "b"})); + // Set up literal operands + RexExpression.Literal boolLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); + RexExpression.Literal strLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "str"); + TransformOperator op = + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); + TransferableBlock result = op.nextBlock(); + // Literal operands should just output original literals. + Assert.assertTrue(!result.isErrorBlock()); + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = Arrays.asList(new Object[]{true, "str"}, new Object[]{true, "str"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + } + + @Test + public void shouldHandlePlusMinusFuncTransform() { + DataSchema upStreamSchema = + new DataSchema(new String[]{"doubleCol1", "doubleCol2"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE + }); + Mockito.when(_upstreamOp.nextBlock()) + .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{1.0, 1.0}, new Object[]{2.0, 3.0})); + // Run a plus and minus function operand on double columns. + RexExpression.InputRef ref0 = new RexExpression.InputRef(0); + RexExpression.InputRef ref1 = new RexExpression.InputRef(1); + List<RexExpression> functionOperands = ImmutableList.of(ref0, ref1); + RexExpression.FunctionCall plus01 = + new RexExpression.FunctionCall(PLUS, FieldSpec.DataType.DOUBLE, "plus", functionOperands); + RexExpression.FunctionCall minus01 = + new RexExpression.FunctionCall(MINUS, FieldSpec.DataType.DOUBLE, "minus", functionOperands); + DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); + TransformOperator op = + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema); + TransferableBlock result = op.nextBlock(); + Assert.assertTrue(!result.isErrorBlock()); + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = Arrays.asList(new Object[]{2.0, 0.0}, new Object[]{5.0, -1.0}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + } + + @Test + public void shouldThrowOnTypeMismatchFuncTransform() { + DataSchema upStreamSchema = new DataSchema(new String[]{"string1", "string2"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_upstreamOp.nextBlock()) + .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{"1.0", "1.0"}, new Object[]{"2.0", "3.0"})); + // Run a plus and minus function operand on string columns. + RexExpression.InputRef ref0 = new RexExpression.InputRef(0); + RexExpression.InputRef ref1 = new RexExpression.InputRef(1); + List<RexExpression> functionOperands = ImmutableList.of(ref0, ref1); + RexExpression.FunctionCall plus01 = + new RexExpression.FunctionCall(PLUS, FieldSpec.DataType.DOUBLE, "plus", functionOperands); + RexExpression.FunctionCall minus01 = + new RexExpression.FunctionCall(MINUS, FieldSpec.DataType.DOUBLE, "minus", functionOperands); + DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); + TransformOperator op = + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema); + + TransferableBlock result = op.nextBlock(); + Assert.assertTrue(result.isErrorBlock()); + DataBlock data = result.getDataBlock(); + Assert.assertTrue(data.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("ArithmeticFunctions")); + } + + @Test + public void shouldPropagateUpstreamError() { + DataSchema upStreamSchema = new DataSchema(new String[]{"string1", "string2"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_upstreamOp.nextBlock()) + .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("transformError"))); + RexExpression.Literal boolLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); + RexExpression.Literal strLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "str"); + DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); + TransformOperator op = + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); + TransferableBlock result = op.nextBlock(); + Assert.assertTrue(result.isErrorBlock()); + DataBlock data = result.getDataBlock(); + Assert.assertTrue(data.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("transformError")); + } + + @Test + public void testNoopBlock() { + DataSchema upStreamSchema = new DataSchema(new String[]{"string1", "string2"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_upstreamOp.nextBlock()) + .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{"a", "a"}, new Object[]{"b", "b"})) + .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) + .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{"c", "c"}, new Object[]{"d", "d"}, new Object[]{ + "e", "e" + })); + RexExpression.Literal boolLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); + RexExpression.Literal strLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "str"); + DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); + TransformOperator op = + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); + TransferableBlock result = op.nextBlock(); + // First block has two rows + Assert.assertFalse(result.isErrorBlock()); + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = Arrays.asList(new Object[]{true, "str"}, new Object[]{true, "str"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + // Second row is NoOp + result = op.nextBlock(); + Assert.assertTrue(result.isNoOpBlock()); + // Third block has one row. + result = op.nextBlock(); + Assert.assertFalse(result.isErrorBlock()); + resultRows = result.getContainer(); + expectedRows = Arrays.asList(new Object[]{true, "str"}, new Object[]{true, "str"}, new Object[]{true, "str"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + Assert.assertEquals(resultRows.get(2), expectedRows.get(2)); + } + + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*transform operand " + + "should not be empty.*") + public void testWrongNumTransform() { + DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); + DataSchema upStreamSchema = new DataSchema(new String[]{"string1", "string2"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING + }); + TransformOperator transform = new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), upStreamSchema); + } + + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*doesn't match " + + "transform operand size.*") + public void testMismatchedSchemaOperandSize() { + DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); + DataSchema upStreamSchema = new DataSchema(new String[]{"string1", "string2"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING + }); + RexExpression.InputRef ref0 = new RexExpression.InputRef(0); + TransformOperator transform = + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0), upStreamSchema); + } +}; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org