This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7b8c8a0fe8 [multistage] [testing] Filter Operator Unit Test (#9792) 7b8c8a0fe8 is described below commit 7b8c8a0fe8fca07d0cad66d0bfa4d02d31dc74c6 Author: Yao Liu <y...@startree.ai> AuthorDate: Tue Nov 15 11:39:33 2022 -0800 [multistage] [testing] Filter Operator Unit Test (#9792) --- .../pinot/query/planner/logical/RexExpression.java | 6 + .../pinot/query/planner/logical/StagePlanner.java | 2 +- .../query/runtime/operator/FilterOperator.java | 14 +- .../runtime/operator/operands/FilterOperand.java | 52 ++-- .../query/runtime/operator/FilterOperatorTest.java | 288 +++++++++++++++++++++ .../query/runtime/operator/OperatorTestUtil.java | 5 + 6 files changed, 345 insertions(+), 22 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java index 3e964e77d0..e9a4a99679 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java @@ -203,12 +203,18 @@ public interface RexExpression { } class FunctionCall implements RexExpression { + // the underlying SQL operator kind of this function. + // It can be either a standard SQL operator or an extended function kind. + // @see #SqlKind.FUNCTION, #SqlKind.OTHER, #SqlKind.OTHER_FUNCTION @ProtoProperties private SqlKind _sqlKind; + // the return data type of the function. @ProtoProperties private FieldSpec.DataType _dataType; + // the name of the SQL function. For standard SqlKind it should match the SqlKind ENUM name. @ProtoProperties private String _functionName; + // the list of RexExpressions that represents the operands to the function. @ProtoProperties private List<RexExpression> _functionOperands; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java index 323e7f506b..2d61856c85 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java @@ -41,7 +41,7 @@ import org.apache.pinot.query.routing.WorkerManager; * This class is non-threadsafe. Do not reuse the stage planner for multiple query plans. */ public class StagePlanner { - private final PlannerContext _plannerContext; + private final PlannerContext _plannerContext; // DO NOT REMOVE. private final WorkerManager _workerManager; private int _stageIdCounter; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java index f639020b25..66956a95ec 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java @@ -30,7 +30,19 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.operands.FilterOperand; - +/* + FilterOperator apply filter on rows from upstreamOperator. + There are three types of filter operands + 1) inputRef + 2) Literal + 3) FunctionOperand + All three types' result has to be a boolean to be used to filter rows. + FunctionOperand supports, + 1) AND, OR, NOT functions to combine operands. + 2) Binary Operand: equals, notEquals, greaterThan, greaterThanOrEqual, lessThan, lessThanOrEqual + 3) All boolean scalar functions we have that take tranformOperand. + Note: Scalar functions are the ones we have in v1 engine and only do function name and arg # matching. + */ public class FilterOperator extends BaseOperator<TransferableBlock> { private static final String EXPLAIN_NAME = "FILTER"; private final Operator<TransferableBlock> _upstreamOperator; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java index 3c58d1710a..b152b33b88 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java @@ -41,69 +41,73 @@ public abstract class FilterOperand extends TransformOperand { } } - public static FilterOperand toFilterOperand(RexExpression.Literal literal) { + private static FilterOperand toFilterOperand(RexExpression.Literal literal) { return new BooleanLiteral(literal); } - public static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, DataSchema dataSchema) { + private static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, DataSchema dataSchema) { return new BooleanInputRef(inputRef, dataSchema); } - public static FilterOperand toFilterOperand(RexExpression.FunctionCall functionCall, DataSchema dataSchema) { - + private static FilterOperand toFilterOperand(RexExpression.FunctionCall functionCall, DataSchema dataSchema) { + int operandSize = functionCall.getFunctionOperands().size(); + // TODO: Move these functions out of this class. switch (OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) { case "AND": + Preconditions.checkState(operandSize >= 2, "AND takes >=2 argument, passed in argument size:" + operandSize); return new And(functionCall.getFunctionOperands(), dataSchema); case "OR": + Preconditions.checkState(operandSize >= 2, "OR takes >=2 argument, passed in argument size:" + operandSize); return new Or(functionCall.getFunctionOperands(), dataSchema); case "NOT": + Preconditions.checkState(operandSize == 1, "NOT takes one argument, passed in argument size:" + operandSize); return new Not(toFilterOperand(functionCall.getFunctionOperands().get(0), dataSchema)); case "equals": return new Predicate(functionCall.getFunctionOperands(), dataSchema) { @Override public Boolean apply(Object[] row) { - return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo( - _resultType.convert(_rhs.apply(row))) == 0; + return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row))) + == 0; } }; case "notEquals": return new Predicate(functionCall.getFunctionOperands(), dataSchema) { @Override public Boolean apply(Object[] row) { - return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo( - _resultType.convert(_rhs.apply(row))) != 0; + return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row))) + != 0; } }; case "greaterThan": return new Predicate(functionCall.getFunctionOperands(), dataSchema) { @Override public Boolean apply(Object[] row) { - return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo( - _resultType.convert(_rhs.apply(row))) > 0; + return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row))) + > 0; } }; case "greaterThanOrEqual": return new Predicate(functionCall.getFunctionOperands(), dataSchema) { @Override public Boolean apply(Object[] row) { - return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo( - _resultType.convert(_rhs.apply(row))) >= 0; + return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row))) + >= 0; } }; case "lessThan": return new Predicate(functionCall.getFunctionOperands(), dataSchema) { @Override public Boolean apply(Object[] row) { - return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo( - _resultType.convert(_rhs.apply(row))) < 0; + return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row))) + < 0; } }; case "lessThanOrEqual": return new Predicate(functionCall.getFunctionOperands(), dataSchema) { @Override public Boolean apply(Object[] row) { - return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo( - _resultType.convert(_rhs.apply(row))) <= 0; + return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row))) + <= 0; } }; default: @@ -119,7 +123,8 @@ public abstract class FilterOperand extends TransformOperand { public BooleanFunction(RexExpression.FunctionCall functionCall, DataSchema dataSchema) { FunctionOperand func = (FunctionOperand) TransformOperand.toTransformOperand(functionCall, dataSchema); - Preconditions.checkState(func.getResultType() == DataSchema.ColumnDataType.BOOLEAN); + Preconditions.checkState(func.getResultType() == DataSchema.ColumnDataType.BOOLEAN, + "Expecting boolean result type but got type:" + func.getResultType()); _func = func; } @@ -133,8 +138,9 @@ public abstract class FilterOperand extends TransformOperand { private final RexExpression.InputRef _inputRef; public BooleanInputRef(RexExpression.InputRef inputRef, DataSchema dataSchema) { - Preconditions.checkState(dataSchema.getColumnDataType(inputRef.getIndex()) - == DataSchema.ColumnDataType.BOOLEAN); + DataSchema.ColumnDataType inputType = dataSchema.getColumnDataType(inputRef.getIndex()); + Preconditions.checkState(inputType == DataSchema.ColumnDataType.BOOLEAN, + "Input has to be boolean type but got type:" + inputType); _inputRef = inputRef; } @@ -148,7 +154,8 @@ public abstract class FilterOperand extends TransformOperand { private final Object _literalValue; public BooleanLiteral(RexExpression.Literal literal) { - Preconditions.checkState(literal.getDataType() == FieldSpec.DataType.BOOLEAN); + Preconditions.checkState(literal.getDataType() == FieldSpec.DataType.BOOLEAN, + "Only boolean literal is supported as filter, but got type:" + literal.getDataType()); _literalValue = literal.getValue(); } @@ -160,6 +167,7 @@ public abstract class FilterOperand extends TransformOperand { private static class And extends FilterOperand { List<FilterOperand> _childOperands; + public And(List<RexExpression> childExprs, DataSchema dataSchema) { _childOperands = new ArrayList<>(childExprs.size()); for (RexExpression childExpr : childExprs) { @@ -180,6 +188,7 @@ public abstract class FilterOperand extends TransformOperand { private static class Or extends FilterOperand { List<FilterOperand> _childOperands; + public Or(List<RexExpression> childExprs, DataSchema dataSchema) { _childOperands = new ArrayList<>(childExprs.size()); for (RexExpression childExpr : childExprs) { @@ -200,6 +209,7 @@ public abstract class FilterOperand extends TransformOperand { private static class Not extends FilterOperand { FilterOperand _childOperand; + public Not(FilterOperand childOperand) { _childOperand = childOperand; } @@ -216,6 +226,8 @@ public abstract class FilterOperand extends TransformOperand { protected final DataSchema.ColumnDataType _resultType; public Predicate(List<RexExpression> functionOperands, DataSchema dataSchema) { + Preconditions.checkState(functionOperands.size() == 2, + "Expected 2 function ops for Predicate but got:" + functionOperands.size()); _lhs = TransformOperand.toTransformOperand(functionOperands.get(0), dataSchema); _rhs = TransformOperand.toTransformOperand(functionOperands.get(1), dataSchema); if (_lhs._resultType != null && _lhs._resultType != DataSchema.ColumnDataType.OBJECT) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java new file mode 100644 index 0000000000..7b1f5bce5f --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.calcite.sql.SqlKind; +import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class FilterOperatorTest { + private AutoCloseable _mocks; + @Mock + private Operator<TransferableBlock> _upstreamOperator; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mocks.close(); + } + + @Test + public void shouldPropagateUpstreamErrorBlock() { + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("filterError"))); + RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); + DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.BOOLEAN + }); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + TransferableBlock errorBlock = op.getNextBlock(); + Assert.assertTrue(errorBlock.isErrorBlock()); + DataBlock error = errorBlock.getDataBlock(); + Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError")); + } + + @Test + public void shouldPropagateUpstreamEOS() { + RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); + + DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT + }); + Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertTrue(dataBlock.isEndOfStreamBlock()); + } + + @Test + public void shouldPropagateUpstreamNoop() { + RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); + + DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT + }); + Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertTrue(dataBlock.isNoOpBlock()); + } + + @Test + public void shouldHandleTrueBooleanLiteralFilter() { + RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); + + DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT + }); + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new Object[]{1})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertFalse(dataBlock.isErrorBlock()); + List<Object[]> result = dataBlock.getContainer(); + Assert.assertEquals(result.size(), 2); + Assert.assertEquals(result.get(0)[0], 0); + Assert.assertEquals(result.get(1)[0], 1); + } + + @Test + public void shouldHandleFalseBooleanLiteralFilter() { + RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, false); + + DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT + }); + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertFalse(dataBlock.isErrorBlock()); + List<Object[]> result = dataBlock.getContainer(); + Assert.assertTrue(result.isEmpty()); + } + + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*boolean literal.*") + public void shouldThrowOnNonBooleanTypeBooleanLiteral() { + RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "false"); + DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT + }); + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + } + + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*Input has to be " + + "boolean type.*") + public void shouldThrowOnNonBooleanTypeInputRef() { + RexExpression ref0 = new RexExpression.InputRef(0); + DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT + }); + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0); + } + + @Test + public void shouldHandleBooleanInputRef() { + RexExpression ref1 = new RexExpression.InputRef(1); + DataSchema inputSchema = new DataSchema(new String[]{"intCol", "boolCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.BOOLEAN + }); + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, new Object[]{2, false})); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertFalse(dataBlock.isErrorBlock()); + List<Object[]> result = dataBlock.getContainer(); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0)[0], 1); + Assert.assertEquals(result.get(0)[1], true); + } + + @Test + public void shouldHandleAndFilter() { + DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN + }); + Mockito.when(_upstreamOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false}, + new Object[]{true, false})); + RexExpression.FunctionCall andCall = new RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND", + ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); + + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertFalse(dataBlock.isErrorBlock()); + List<Object[]> result = dataBlock.getContainer(); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0)[0], true); + Assert.assertEquals(result.get(0)[1], true); + } + + @Test + public void shouldHandleOrFilter() { + DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN + }); + Mockito.when(_upstreamOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false}, + new Object[]{true, false})); + RexExpression.FunctionCall orCall = new RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR", + ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); + + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertFalse(dataBlock.isErrorBlock()); + List<Object[]> result = dataBlock.getContainer(); + Assert.assertEquals(result.size(), 2); + Assert.assertEquals(result.get(0)[0], true); + Assert.assertEquals(result.get(0)[1], true); + Assert.assertEquals(result.get(1)[0], true); + Assert.assertEquals(result.get(1)[1], false); + } + + @Test + public void shouldHandleNotFilter() { + DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN + }); + Mockito.when(_upstreamOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false}, + new Object[]{true, false})); + RexExpression.FunctionCall notCall = new RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT", + ImmutableList.of(new RexExpression.InputRef(0))); + + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertFalse(dataBlock.isErrorBlock()); + List<Object[]> result = dataBlock.getContainer(); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0)[0], false); + Assert.assertEquals(result.get(0)[1], false); + } + + @Test + public void shouldHandleGreaterThanFilter() { + DataSchema inputSchema = new DataSchema(new String[]{"int0", "int1"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT + }); + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, 2}, new Object[]{3, 2}, new Object[]{1, 1})); + RexExpression.FunctionCall greaterThan = + new RexExpression.FunctionCall(SqlKind.GREATER_THAN, FieldSpec.DataType.BOOLEAN, "greaterThan", + ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertFalse(dataBlock.isErrorBlock()); + List<Object[]> result = dataBlock.getContainer(); + List<Object[]> expectedResult = ImmutableList.of(new Object[]{3, 2}); + Assert.assertEquals(result.size(), expectedResult.size()); + Assert.assertEquals(result.get(0), expectedResult.get(0)); + } + + @Test + public void shouldHandleBooleanFunction() { + DataSchema inputSchema = new DataSchema(new String[]{"string1"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING + }); + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{"starTree"}, new Object[]{"treeStar"})); + RexExpression.FunctionCall startsWith = + new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWith", + ImmutableList.of(new RexExpression.InputRef(0), + new RexExpression.Literal(FieldSpec.DataType.STRING, "star"))); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith); + TransferableBlock dataBlock = op.getNextBlock(); + Assert.assertFalse(dataBlock.isErrorBlock()); + List<Object[]> result = dataBlock.getContainer(); + List<Object[]> expectedResult = ImmutableList.of(new Object[]{"starTree"}); + Assert.assertEquals(result.size(), expectedResult.size()); + Assert.assertEquals(result.get(0), expectedResult.get(0)); + } + + @Test(expectedExceptions = NullPointerException.class, expectedExceptionsMessageRegExp = ".*Cannot find function " + + "with Name: startsWithError.*") + public void shouldThrowOnUnfoundFunction() { + DataSchema inputSchema = new DataSchema(new String[]{"string1"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING + }); + Mockito.when(_upstreamOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{"starTree"}, new Object[]{"treeStar"})); + RexExpression.FunctionCall startsWith = + new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWithError", + ImmutableList.of(new RexExpression.InputRef(0), + new RexExpression.Literal(FieldSpec.DataType.STRING, "star"))); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith); + } +} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index 86b65a1686..0537c67ca9 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator; import java.util.Arrays; import java.util.List; +import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -57,4 +58,8 @@ public class OperatorTestUtil { public static DataSchema getDataSchema(String operatorName) { return MOCK_OPERATOR_FACTORY.getDataSchema(operatorName); } + + public static TransferableBlock block(DataSchema schema, Object[]... rows) { + return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org