This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ec47f5af23 [multistage] [testing] Add join operator unit test (#9775) ec47f5af23 is described below commit ec47f5af234c22734590dcd21c990d04b62d129a Author: Yao Liu <y...@startree.ai> AuthorDate: Thu Nov 17 19:35:48 2022 -0800 [multistage] [testing] Add join operator unit test (#9775) --- .../query/runtime/operator/HashJoinOperator.java | 55 ++- .../pinot/query/service/QueryDispatcher.java | 18 +- .../runtime/operator/AggregateOperatorTest.java | 17 +- .../runtime/operator/HashJoinOperatorTest.java | 549 +++++++++++++++++++-- 4 files changed, 545 insertions(+), 94 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index ce0f19177f..a15afee20e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -18,10 +18,13 @@ */ package org.apache.pinot.query.runtime.operator; +import com.clearspring.analytics.util.Preconditions; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Set; import javax.annotation.Nullable; import org.apache.calcite.rel.core.JoinRelType; import org.apache.pinot.common.datablock.DataBlock; @@ -48,9 +51,10 @@ import org.apache.pinot.query.runtime.operator.operands.FilterOperand; * We currently support left join, inner join and semi join. * The output is in the format of [left_row, right_row] */ +// TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728) public class HashJoinOperator extends BaseOperator<TransferableBlock> { - private static final String EXPLAIN_NAME = "BROADCAST_JOIN"; - + private static final String EXPLAIN_NAME = "HASH_JOIN"; + private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT); private final HashMap<Key, List<Object[]>> _broadcastHashTable; private final Operator<TransferableBlock> _leftTableOperator; private final Operator<TransferableBlock> _rightTableOperator; @@ -63,13 +67,14 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> { private KeySelector<Object[], Object[]> _leftKeySelector; private KeySelector<Object[], Object[]> _rightKeySelector; - // TODO: Fix inequi join bug. (https://github.com/apache/pinot/issues/9728) - // TODO: Double check semi join logic. public HashJoinOperator(Operator<TransferableBlock> leftTableOperator, Operator<TransferableBlock> rightTableOperator, DataSchema outputSchema, JoinNode.JoinKeys joinKeys, List<RexExpression> joinClauses, JoinRelType joinType) { - // TODO: Handle the case where _leftKeySelector and _rightKeySelector could be null. + Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(joinType), + "Join type: " + joinType + " is not supported!"); _leftKeySelector = joinKeys.getLeftJoinKeySelector(); _rightKeySelector = joinKeys.getRightJoinKeySelector(); + Preconditions.checkState(_leftKeySelector != null, "LeftKeySelector for join cannot be null"); + Preconditions.checkState(_rightKeySelector != null, "RightKeySelector for join cannot be null"); _leftTableOperator = leftTableOperator; _rightTableOperator = rightTableOperator; _resultSchema = outputSchema; @@ -98,19 +103,17 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> { @Override protected TransferableBlock getNextBlock() { - if (!_isHashTableBuilt) { - // Build JOIN hash table - buildBroadcastHashTable(); - } - - if (_upstreamErrorBlock != null) { - return _upstreamErrorBlock; - } else if (!_isHashTableBuilt) { - return TransferableBlockUtils.getNoOpTransferableBlock(); - } - - // JOIN each left block with the right block. try { + if (!_isHashTableBuilt) { + // Build JOIN hash table + buildBroadcastHashTable(); + } + if (_upstreamErrorBlock != null) { + return _upstreamErrorBlock; + } else if (!_isHashTableBuilt) { + return TransferableBlockUtils.getNoOpTransferableBlock(); + } + // JOIN each left block with the right block. return buildJoinedDataBlock(_leftTableOperator.nextBlock()); } catch (Exception e) { return TransferableBlockUtils.getErrorTransferableBlock(e); @@ -145,30 +148,30 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> { if (leftBlock.isErrorBlock()) { _upstreamErrorBlock = leftBlock; return _upstreamErrorBlock; - } else if (TransferableBlockUtils.isEndOfStream(leftBlock) || TransferableBlockUtils.isNoOpBlock(leftBlock)) { + } else if (TransferableBlockUtils.isNoOpBlock(leftBlock) || TransferableBlockUtils.isEndOfStream(leftBlock)) { return leftBlock; } - List<Object[]> rows = new ArrayList<>(); - List<Object[]> container = leftBlock.getContainer(); + List<Object[]> container = leftBlock.isEndOfStreamBlock() ? new ArrayList<>() : leftBlock.getContainer(); for (Object[] leftRow : container) { - List<Object[]> hashCollection = _broadcastHashTable.getOrDefault( - new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList()); + // NOTE: Empty key selector will always give same hash code. + List<Object[]> hashCollection = + _broadcastHashTable.getOrDefault(new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList()); // If it is a left join and right table is empty, we return left rows. if (hashCollection.isEmpty() && _joinType == JoinRelType.LEFT) { rows.add(joinRow(leftRow, null)); } else { // If it is other type of join. for (Object[] rightRow : hashCollection) { + // TODO: Optimize this to avoid unnecessary object copy. Object[] resultRow = joinRow(leftRow, rightRow); - if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch( - evaluator -> evaluator.apply(resultRow))) { + if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream() + .allMatch(evaluator -> evaluator.apply(resultRow))) { rows.add(resultRow); } } } } - return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW); } @@ -178,7 +181,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> { for (Object obj : leftRow) { resultRow[idx++] = obj; } - if (_joinType != JoinRelType.SEMI && rightRow != null) { + if (rightRow != null) { for (Object obj : rightRow) { resultRow[idx++] = obj; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index 2fe9900bb8..df30f1b617 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -45,16 +45,12 @@ import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.roaringbitmap.RoaringBitmap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@code QueryDispatcher} dispatch a query to different workers. */ public class QueryDispatcher { - private static final Logger LOGGER = LoggerFactory.getLogger(QueryDispatcher.class); - private final Map<String, DispatchClient> _dispatchClientMap = new ConcurrentHashMap<>(); public QueryDispatcher() { @@ -68,8 +64,8 @@ public class QueryDispatcher { // run reduce stage and return result. MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId); MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService, - queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), - requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), mailboxService.getHostname(), + queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), requestId, + reduceNode.getSenderStageId(), reduceNode.getDataSchema(), mailboxService.getHostname(), mailboxService.getMailboxPort()); List<DataBlock> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutNano); return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(), @@ -91,9 +87,8 @@ public class QueryDispatcher { int servicePort = serverInstance.getQueryServicePort(); int mailboxPort = serverInstance.getQueryMailboxPort(); DispatchClient client = getOrCreateDispatchClient(host, servicePort); - Worker.QueryResponse response = client.submit(Worker.QueryRequest.newBuilder() - .setStagePlan(QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, - serverInstance))) + Worker.QueryResponse response = client.submit(Worker.QueryRequest.newBuilder().setStagePlan( + QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, serverInstance))) .putMetadata("REQUEST_ID", String.valueOf(requestId)) .putMetadata("SERVER_INSTANCE_HOST", serverInstance.getHostname()) .putMetadata("SERVER_INSTANCE_PORT", String.valueOf(mailboxPort)).build()); @@ -132,8 +127,8 @@ public class QueryDispatcher { if (TransferableBlockUtils.isEndOfStream(transferableBlock) && transferableBlock.isErrorBlock()) { // TODO: we only received bubble up error from the execution stage tree. // TODO: query dispatch should also send cancel signal to the rest of the execution stage tree. - throw new RuntimeException("Received error query execution result block: " - + transferableBlock.getDataBlock().getExceptions()); + throw new RuntimeException( + "Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions()); } if (transferableBlock.isNoOpBlock()) { continue; @@ -154,7 +149,6 @@ public class QueryDispatcher { for (DataBlock dataBlock : queryResult) { int numColumns = resultSchema.getColumnNames().length; int numRows = dataBlock.getNumberOfRows(); - DataSchema.ColumnDataType[] resultColumnDataTypes = resultSchema.getColumnDataTypes(); List<Object[]> rows = new ArrayList<>(dataBlock.getNumberOfRows()); if (numRows > 0) { RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns]; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java index 72f2c31f63..5434701974 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.List; import org.apache.calcite.sql.SqlKind; -import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.Operator; @@ -111,7 +110,7 @@ public class AggregateOperatorTest { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); Mockito.when(_input.nextBlock()) - .thenReturn(block(inSchema, new Object[]{1, 1})) + .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1})) .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); @@ -135,7 +134,7 @@ public class AggregateOperatorTest { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); Mockito.when(_input.nextBlock()) - .thenReturn(block(inSchema, new Object[]{2, 1})) + .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); @@ -163,7 +162,7 @@ public class AggregateOperatorTest { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); Mockito.when(_input.nextBlock()) - .thenReturn(block(inSchema, new Object[]{2, 3})) + .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); @@ -192,8 +191,8 @@ public class AggregateOperatorTest { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); Mockito.when(_input.nextBlock()) - .thenReturn(block(inSchema, new Object[]{1, 1}, new Object[]{1, 1})) - .thenReturn(block(inSchema, new Object[]{1, 1})) + .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}, new Object[]{1, 1})) + .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); AggregateOperator.Merger merger = Mockito.mock(AggregateOperator.Merger.class); @@ -260,7 +259,7 @@ public class AggregateOperatorTest { Mockito.when(_input.nextBlock()) // TODO: it is necessary to produce two values here, the operator only throws on second // (see the comment in Aggregate operator) - .thenReturn(block(inSchema, new Object[]{2, "foo"}, new Object[]{2, "foo"})) + .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"}, new Object[]{2, "foo"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); @@ -275,10 +274,6 @@ public class AggregateOperatorTest { "expected it to fail with class cast exception"); } - private static TransferableBlock block(DataSchema schema, Object[]... rows) { - return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW); - } - private static RexExpression.FunctionCall getSum(RexExpression arg) { return new RexExpression.FunctionCall(SqlKind.SUM, FieldSpec.DataType.INT, "SUM", ImmutableList.of(arg)); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index 0cf912c279..e88a2703bd 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -18,21 +18,51 @@ */ package org.apache.pinot.query.runtime.operator; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.sql.SqlKind; +import org.apache.pinot.common.datablock.MetadataBlock; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.stage.JoinNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class HashJoinOperatorTest { + private AutoCloseable _mocks; + + @Mock + private Operator<TransferableBlock> _leftOperator; + + @Mock + private Operator<TransferableBlock> _rightOperator; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mocks.close(); + } + private static JoinNode.JoinKeys getJoinKeys(List<Integer> leftIdx, List<Integer> rightIdx) { FieldSelectionKeySelector leftSelect = new FieldSelectionKeySelector(leftIdx); FieldSelectionKeySelector rightSelect = new FieldSelectionKeySelector(rightIdx); @@ -40,84 +70,513 @@ public class HashJoinOperatorTest { } @Test - public void testHashJoinKeyCollisionInnerJoin() { - BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); - BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); + public void shouldHandleHashJoinKeyCollisionInnerJoin() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); List<RexExpression> joinClauses = new ArrayList<>(); - DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{ - DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, - DataSchema.ColumnDataType.STRING + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER); + + TransferableBlock result = joinOnString.nextBlock(); + while (result.isNoOpBlock()) { + result = joinOnString.nextBlock(); + } + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = + Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"}, new Object[]{2, "BB", 3, "BB"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + Assert.assertEquals(resultRows.get(2), expectedRows.get(2)); + } + + @Test + public void shouldHandleInnerJoinOnInt() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING }); - HashJoinOperator join = - new HashJoinOperator(leftOperator, rightOperator, resultSchema, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), - joinClauses, JoinRelType.INNER); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_co2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER); + TransferableBlock result = joinOnInt.nextBlock(); + while (result.isNoOpBlock()) { + result = joinOnInt.nextBlock(); + } + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = Arrays.asList(new Object[]{2, "BB", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + } + + @Test + public void shouldHandleJoinOnEmptySelector() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_co2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, JoinRelType.INNER); + TransferableBlock result = joinOnInt.nextBlock(); + while (result.isNoOpBlock()) { + result = joinOnInt.nextBlock(); + } + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = + Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{1, "Aa", 2, "BB"}, new Object[]{1, "Aa", 3, "BB"}, + new Object[]{2, "BB", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"}, new Object[]{2, "BB", 3, "BB"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + Assert.assertEquals(resultRows.get(2), expectedRows.get(2)); + Assert.assertEquals(resultRows.get(3), expectedRows.get(3)); + Assert.assertEquals(resultRows.get(4), expectedRows.get(4)); + Assert.assertEquals(resultRows.get(5), expectedRows.get(5)); + } + + @Test + public void shouldHandleLeftJoin() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "CC"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.LEFT); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); } List<Object[]> resultRows = result.getContainer(); - List<Object[]> expectedRows = - Arrays.asList(new Object[]{1, "Aa", 1, "Aa"}, new Object[]{2, "BB", 2, "BB"}, new Object[]{2, "BB", 3, "BB"}, - new Object[]{3, "BB", 2, "BB"}, new Object[]{3, "BB", 3, "BB"}); - Assert.assertEquals(expectedRows.size(), resultRows.size()); - Assert.assertEquals(expectedRows.get(0), resultRows.get(0)); - Assert.assertEquals(expectedRows.get(1), resultRows.get(1)); - Assert.assertEquals(expectedRows.get(2), resultRows.get(2)); - Assert.assertEquals(expectedRows.get(3), resultRows.get(3)); - Assert.assertEquals(expectedRows.get(4), resultRows.get(4)); + List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "CC", null, null}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); } @Test - public void testInnerJoin() { - BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); - BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_2); + public void shouldPassLeftTableEOS() { + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new Object[]{1, "CC"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER); - DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{ - DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, - DataSchema.ColumnDataType.STRING + TransferableBlock result = join.nextBlock(); + while (result.isNoOpBlock()) { + result = join.nextBlock(); + } + Assert.assertTrue(result.isEndOfStreamBlock()); + } + + @Test + public void shouldHandleLeftJoinOneToN() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING }); - HashJoinOperator join = - new HashJoinOperator(leftOperator, rightOperator, resultSchema, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), - joinClauses, JoinRelType.INNER); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new Object[]{1, "CC"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.LEFT); + + TransferableBlock result = join.nextBlock(); + while (result.isNoOpBlock()) { + result = join.nextBlock(); + } + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 1, "BB"}, new Object[]{1, "Aa", 1, "CC"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + } + + @Test + public void shouldPassRightTableEOS() { + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new Object[]{1, "CC"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); } List<Object[]> resultRows = result.getContainer(); - Object[] expRow = new Object[]{1, "Aa", 2, "Aa"}; - List<Object[]> expectedRows = new ArrayList<>(); - expectedRows.add(expRow); - Assert.assertEquals(expectedRows.size(), resultRows.size()); - Assert.assertEquals(expectedRows.get(0), resultRows.get(0)); + Assert.assertTrue(resultRows.isEmpty()); } @Test - public void testLeftJoin() { - BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); - BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_2); + public void shouldHandleInequiJoinOnString() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + List<RexExpression> functionOperands = new ArrayList<>(); + functionOperands.add(new RexExpression.InputRef(1)); + functionOperands.add(new RexExpression.InputRef(3)); + joinClauses.add( + new RexExpression.FunctionCall(SqlKind.NOT_EQUALS, FieldSpec.DataType.STRING, "NOT_EQUALS", functionOperands)); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, JoinRelType.INNER); + + TransferableBlock result = join.nextBlock(); + while (result.isNoOpBlock()) { + result = join.nextBlock(); + } + Assert.assertTrue(result.isErrorBlock()); + MetadataBlock errorBlock = (MetadataBlock) result.getDataBlock(); + Assert.assertTrue(errorBlock.getExceptions().get(1000).matches(".*notEquals.*")); + } + + @Test + public void shouldHandleInequiJoinOnInt() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{1, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + List<RexExpression> functionOperands = new ArrayList<>(); + functionOperands.add(new RexExpression.InputRef(0)); + functionOperands.add(new RexExpression.InputRef(2)); + joinClauses.add( + new RexExpression.FunctionCall(SqlKind.NOT_EQUALS, FieldSpec.DataType.STRING, "NOT_EQUALS", functionOperands)); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, JoinRelType.INNER); + + TransferableBlock result = join.nextBlock(); + while (result.isNoOpBlock()) { + result = join.nextBlock(); + } + List<Object[]> resultRows = result.getContainer(); + List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "BB", 1, "BB"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); + } + + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RIGHT is not supported" + + ".*") + public void shouldThrowOnRightJoin() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.RIGHT); + } + + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*SEMI is not " + + "supported.*") + public void shouldThrowOnSemiJoin() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.SEMI); + } + + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*FULL is not supported.*") + public void shouldThrowOnFullJoin() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); List<RexExpression> joinClauses = new ArrayList<>(); DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING }); - HashJoinOperator join = - new HashJoinOperator(leftOperator, rightOperator, resultSchema, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), - joinClauses, JoinRelType.LEFT); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.FULL); + } + + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*ANTI is not supported.*") + public void shouldThrowOnAntiJoin() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.ANTI); + } + + @Test + public void shouldPropagateRightTableError() { + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new Object[]{1, "CC"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()) + .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("testInnerJoinRightError"))); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER); + + TransferableBlock result = join.nextBlock(); + while (result.isNoOpBlock()) { + result = join.nextBlock(); + } + Assert.assertTrue(result.isErrorBlock()); + Assert.assertTrue(result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE) + .matches("testInnerJoinRightError")); + } + + @Test + public void shouldPropagateLeftTableError() { + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new Object[]{1, "CC"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("testInnerJoinLeftError"))); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); } + Assert.assertTrue(result.isErrorBlock()); + Assert.assertTrue( + result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).matches("testInnerJoinLeftError")); + } + + @Test + public void shouldHandleNoOpBlock() { + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) + .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "CC"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"})) + .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) + .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"})) + .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER); + + TransferableBlock result = join.nextBlock(); // first no-op consumes first right data block. + Assert.assertTrue(result.isNoOpBlock()); + result = join.nextBlock(); // second no-op consumes no-op right block. + Assert.assertTrue(result.isNoOpBlock()); + result = join.nextBlock(); // third no-op consumes another right data block. + Assert.assertTrue(result.isNoOpBlock()); + result = join.nextBlock(); // forth no-op consumes another right data block. + Assert.assertTrue(result.isNoOpBlock()); + result = join.nextBlock(); // build result using the first left block List<Object[]> resultRows = result.getContainer(); - List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "BB", null, null}, - new Object[]{3, "BB", null, null}); - Assert.assertEquals(expectedRows.size(), resultRows.size()); - Assert.assertEquals(expectedRows.get(0), resultRows.get(0)); - Assert.assertEquals(expectedRows.get(1), resultRows.get(1)); - Assert.assertEquals(expectedRows.get(2), resultRows.get(2)); + List<Object[]> expectedRows = ImmutableList.of(new Object[]{2, "BB", 2, "Aa"}); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + result = join.nextBlock(); // second left block is no-op + Assert.assertTrue(result.isNoOpBlock()); + result = join.nextBlock(); // third left block consumes some extra data + expectedRows = ImmutableList.of(new Object[]{2, "CC", 2, "Aa"}); + resultRows = result.getContainer(); + Assert.assertEquals(resultRows.size(), expectedRows.size()); + Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); + result = join.nextBlock(); // last one is EOS. + Assert.assertTrue(result.isEndOfStreamBlock()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org