This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2987f7ad08 [multistage] [testing] Add transform operator unit test 
(#9791)
2987f7ad08 is described below

commit 2987f7ad082cf8f6abd7a9f82d4518734b3738d3
Author: Yao Liu <y...@startree.ai>
AuthorDate: Tue Nov 15 11:39:49 2022 -0800

    [multistage] [testing] Add transform operator unit test (#9791)
---
 .../pinot/common/function/FunctionRegistry.java    |   2 +-
 .../query/runtime/operator/TransformOperator.java  |  16 +-
 .../runtime/operator/operands/FunctionOperand.java |   5 +-
 .../runtime/operator/HashJoinOperatorTest.java     |  18 +-
 .../runtime/operator/TransformOperatorTest.java    | 246 +++++++++++++++++++++
 5 files changed, 275 insertions(+), 12 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java
index 4d786b2ed5..82a16720e8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java
@@ -50,7 +50,7 @@ public class FunctionRegistry {
   // TODO: consolidate the following 2
   // This FUNCTION_INFO_MAP is used by Pinot server to look up function by # 
of arguments
   private static final Map<String, Map<Integer, FunctionInfo>> 
FUNCTION_INFO_MAP = new HashMap<>();
-  // This FUNCTION_MAP is used by Calcite function catalog tolook up function 
by function signature.
+  // This FUNCTION_MAP is used by Calcite function catalog to look up function 
by function signature.
   private static final NameMultimap<Function> FUNCTION_MAP = new 
NameMultimap<>();
 
   /**
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index ff74602d34..e00d4a9eed 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
@@ -33,24 +34,35 @@ import 
org.apache.pinot.query.runtime.operator.operands.TransformOperand;
 
 /**
  * This basic {@code TransformOperator} implement basic transformations.
+ *
+ * This operator performs three kinds of transform
+ * - InputRef transform, which reads from certain input column based on column 
index
+ * - Literal transform, which outputs literal value
+ * - Function transform, which runs a function on function operands. Function 
operands and be any of 3 the transform.
+ * Note: Function transform only runs functions from v1 engine scalar function 
factory, which only does argument count
+ * and canonicalized function name matching (lower case).
  */
 public class TransformOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "TRANSFORM";
   private final Operator<TransferableBlock> _upstreamOperator;
   private final List<TransformOperand> _transformOperandsList;
   private final int _resultColumnSize;
+  // TODO: Check type matching between resultSchema and the actual result.
   private final DataSchema _resultSchema;
   private TransferableBlock _upstreamErrorBlock;
 
-  public TransformOperator(Operator<TransferableBlock> upstreamOperator, 
DataSchema dataSchema,
+  public TransformOperator(Operator<TransferableBlock> upstreamOperator, 
DataSchema resultSchema,
       List<RexExpression> transforms, DataSchema upstreamDataSchema) {
+    Preconditions.checkState(!transforms.isEmpty(), "transform operand should 
not be empty.");
+    Preconditions.checkState(resultSchema.size() == transforms.size(),
+        "result schema size:" + resultSchema.size() + " doesn't match 
transform operand size:" + transforms.size());
     _upstreamOperator = upstreamOperator;
     _resultColumnSize = transforms.size();
     _transformOperandsList = new ArrayList<>(_resultColumnSize);
     for (RexExpression rexExpression : transforms) {
       
_transformOperandsList.add(TransformOperand.toTransformOperand(rexExpression, 
upstreamDataSchema));
     }
-    _resultSchema = dataSchema;
+    _resultSchema = resultSchema;
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
index 97fae1d57b..029f5ddaad 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
@@ -29,7 +29,9 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.operator.OperatorUtils;
 
-
+/*
+ * FunctionOperands are generated from {@link RexExpression}s.
+ */
 public class FunctionOperand extends TransformOperand {
   private final List<TransformOperand> _childOperandList;
   private final FunctionInvoker _functionInvoker;
@@ -48,6 +50,7 @@ public class FunctionOperand extends TransformOperand {
     Preconditions.checkNotNull(functionInfo, "Cannot find function with Name: 
" + functionCall.getFunctionName());
     _functionInvoker = new FunctionInvoker(functionInfo);
     _resultName = computeColumnName(functionCall.getFunctionName(), 
_childOperandList);
+    // TODO: Check type match between functionCall's data type and result type.
     _resultType = 
FunctionUtils.getColumnDataType(_functionInvoker.getResultClass());
     _reusableOperandHolder = new Object[operandExpressions.size()];
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 65bb2192c2..0cf912c279 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -32,7 +32,6 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 
-
 public class HashJoinOperatorTest {
   private static JoinNode.JoinKeys getJoinKeys(List<Integer> leftIdx, 
List<Integer> rightIdx) {
     FieldSelectionKeySelector leftSelect = new 
FieldSelectionKeySelector(leftIdx);
@@ -49,8 +48,9 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, 
resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.INNER);
+    HashJoinOperator join =
+        new HashJoinOperator(leftOperator, rightOperator, resultSchema, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
+            joinClauses, JoinRelType.INNER);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -72,14 +72,15 @@ public class HashJoinOperatorTest {
   public void testInnerJoin() {
     BaseOperator<TransferableBlock> leftOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
     BaseOperator<TransferableBlock> rightOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
-
     List<RexExpression> joinClauses = new ArrayList<>();
+
     DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", 
"bar"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, 
resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.INNER);
+    HashJoinOperator join =
+        new HashJoinOperator(leftOperator, rightOperator, resultSchema, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
+            joinClauses, JoinRelType.INNER);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -103,8 +104,9 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, 
resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.LEFT);
+    HashJoinOperator join =
+        new HashJoinOperator(leftOperator, rightOperator, resultSchema, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
+            joinClauses, JoinRelType.LEFT);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
new file mode 100644
index 0000000000..868ec3b9b0
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.calcite.sql.SqlKind.MINUS;
+import static org.apache.calcite.sql.SqlKind.PLUS;
+
+
+public class TransformOperatorTest {
+  private AutoCloseable _mocks;
+
+  @Mock
+  private Operator<TransferableBlock> _upstreamOp;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void shouldHandleRefTransform() {
+    DataSchema upStreamSchema = new DataSchema(new String[]{"intCol", 
"strCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING});
+    Mockito.when(_upstreamOp.nextBlock())
+        .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{1, 
"a"}, new Object[]{2, "b"}));
+    // Output column value
+    RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
+    RexExpression.InputRef ref1 = new RexExpression.InputRef(1);
+    TransformOperator op =
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(ref0, ref1), upStreamSchema);
+    TransferableBlock result = op.nextBlock();
+
+    Assert.assertTrue(!result.isErrorBlock());
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "a"}, new 
Object[]{2, "b"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+  }
+
+  @Test
+  public void shouldHandleLiteralTransform() {
+    DataSchema upStreamSchema = new DataSchema(new String[]{"boolCol", 
"strCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "strCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, 
DataSchema.ColumnDataType.STRING});
+    Mockito.when(_upstreamOp.nextBlock())
+        .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{1, 
"a"}, new Object[]{2, "b"}));
+    // Set up literal operands
+    RexExpression.Literal boolLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+    RexExpression.Literal strLiteral = new 
RexExpression.Literal(FieldSpec.DataType.STRING, "str");
+    TransformOperator op =
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+    TransferableBlock result = op.nextBlock();
+    // Literal operands should just output original literals.
+    Assert.assertTrue(!result.isErrorBlock());
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{true, "str"}, new 
Object[]{true, "str"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+  }
+
+  @Test
+  public void shouldHandlePlusMinusFuncTransform() {
+    DataSchema upStreamSchema =
+        new DataSchema(new String[]{"doubleCol1", "doubleCol2"}, new 
DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE
+        });
+    Mockito.when(_upstreamOp.nextBlock())
+        .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{1.0, 
1.0}, new Object[]{2.0, 3.0}));
+    // Run a plus and minus function operand on double columns.
+    RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
+    RexExpression.InputRef ref1 = new RexExpression.InputRef(1);
+    List<RexExpression> functionOperands = ImmutableList.of(ref0, ref1);
+    RexExpression.FunctionCall plus01 =
+        new RexExpression.FunctionCall(PLUS, FieldSpec.DataType.DOUBLE, 
"plus", functionOperands);
+    RexExpression.FunctionCall minus01 =
+        new RexExpression.FunctionCall(MINUS, FieldSpec.DataType.DOUBLE, 
"minus", functionOperands);
+    DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE});
+    TransformOperator op =
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(plus01, minus01), upStreamSchema);
+    TransferableBlock result = op.nextBlock();
+    Assert.assertTrue(!result.isErrorBlock());
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{2.0, 0.0}, new 
Object[]{5.0, -1.0});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+  }
+
+  @Test
+  public void shouldThrowOnTypeMismatchFuncTransform() {
+    DataSchema upStreamSchema = new DataSchema(new String[]{"string1", 
"string2"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_upstreamOp.nextBlock())
+        .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{"1.0", 
"1.0"}, new Object[]{"2.0", "3.0"}));
+    // Run a plus and minus function operand on string columns.
+    RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
+    RexExpression.InputRef ref1 = new RexExpression.InputRef(1);
+    List<RexExpression> functionOperands = ImmutableList.of(ref0, ref1);
+    RexExpression.FunctionCall plus01 =
+        new RexExpression.FunctionCall(PLUS, FieldSpec.DataType.DOUBLE, 
"plus", functionOperands);
+    RexExpression.FunctionCall minus01 =
+        new RexExpression.FunctionCall(MINUS, FieldSpec.DataType.DOUBLE, 
"minus", functionOperands);
+    DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE});
+    TransformOperator op =
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(plus01, minus01), upStreamSchema);
+
+    TransferableBlock result = op.nextBlock();
+    Assert.assertTrue(result.isErrorBlock());
+    DataBlock data = result.getDataBlock();
+    
Assert.assertTrue(data.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("ArithmeticFunctions"));
+  }
+
+  @Test
+  public void shouldPropagateUpstreamError() {
+    DataSchema upStreamSchema = new DataSchema(new String[]{"string1", 
"string2"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_upstreamOp.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("transformError")));
+    RexExpression.Literal boolLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+    RexExpression.Literal strLiteral = new 
RexExpression.Literal(FieldSpec.DataType.STRING, "str");
+    DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING});
+    TransformOperator op =
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+    TransferableBlock result = op.nextBlock();
+    Assert.assertTrue(result.isErrorBlock());
+    DataBlock data = result.getDataBlock();
+    
Assert.assertTrue(data.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("transformError"));
+  }
+
+  @Test
+  public void testNoopBlock() {
+    DataSchema upStreamSchema = new DataSchema(new String[]{"string1", 
"string2"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_upstreamOp.nextBlock())
+        .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{"a", 
"a"}, new Object[]{"b", "b"}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+        .thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{"c", 
"c"}, new Object[]{"d", "d"}, new Object[]{
+            "e", "e"
+        }));
+    RexExpression.Literal boolLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+    RexExpression.Literal strLiteral = new 
RexExpression.Literal(FieldSpec.DataType.STRING, "str");
+    DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING});
+    TransformOperator op =
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+    TransferableBlock result = op.nextBlock();
+    // First block has two rows
+    Assert.assertFalse(result.isErrorBlock());
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{true, "str"}, new 
Object[]{true, "str"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+    // Second row is NoOp
+    result = op.nextBlock();
+    Assert.assertTrue(result.isNoOpBlock());
+    // Third block has one row.
+    result = op.nextBlock();
+    Assert.assertFalse(result.isErrorBlock());
+    resultRows = result.getContainer();
+    expectedRows = Arrays.asList(new Object[]{true, "str"}, new Object[]{true, 
"str"}, new Object[]{true, "str"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+    Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*transform operand "
+      + "should not be empty.*")
+  public void testWrongNumTransform() {
+    DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING});
+    DataSchema upStreamSchema = new DataSchema(new String[]{"string1", 
"string2"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
+    });
+    TransformOperator transform = new TransformOperator(_upstreamOp, 
resultSchema, new ArrayList<>(), upStreamSchema);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*doesn't match "
+      + "transform operand size.*")
+  public void testMismatchedSchemaOperandSize() {
+    DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING});
+    DataSchema upStreamSchema = new DataSchema(new String[]{"string1", 
"string2"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
+    });
+    RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
+    TransformOperator transform =
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(ref0), upStreamSchema);
+  }
+};


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to