This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ec47f5af23 [multistage] [testing] Add join operator unit test (#9775)
ec47f5af23 is described below

commit ec47f5af234c22734590dcd21c990d04b62d129a
Author: Yao Liu <y...@startree.ai>
AuthorDate: Thu Nov 17 19:35:48 2022 -0800

    [multistage] [testing] Add join operator unit test (#9775)
---
 .../query/runtime/operator/HashJoinOperator.java   |  55 ++-
 .../pinot/query/service/QueryDispatcher.java       |  18 +-
 .../runtime/operator/AggregateOperatorTest.java    |  17 +-
 .../runtime/operator/HashJoinOperatorTest.java     | 549 +++++++++++++++++++--
 4 files changed, 545 insertions(+), 94 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ce0f19177f..a15afee20e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.clearspring.analytics.util.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -48,9 +51,10 @@ import 
org.apache.pinot.query.runtime.operator.operands.FilterOperand;
  * We currently support left join, inner join and semi join.
  * The output is in the format of [left_row, right_row]
  */
+// TODO: Move inequi out of hashjoin. 
(https://github.com/apache/pinot/issues/9728)
 public class HashJoinOperator extends BaseOperator<TransferableBlock> {
-  private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
-
+  private static final String EXPLAIN_NAME = "HASH_JOIN";
+  private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = 
ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT);
   private final HashMap<Key, List<Object[]>> _broadcastHashTable;
   private final Operator<TransferableBlock> _leftTableOperator;
   private final Operator<TransferableBlock> _rightTableOperator;
@@ -63,13 +67,14 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   private KeySelector<Object[], Object[]> _leftKeySelector;
   private KeySelector<Object[], Object[]> _rightKeySelector;
 
-  // TODO: Fix inequi join bug. (https://github.com/apache/pinot/issues/9728)
-  // TODO: Double check semi join logic.
   public HashJoinOperator(Operator<TransferableBlock> leftTableOperator, 
Operator<TransferableBlock> rightTableOperator,
       DataSchema outputSchema, JoinNode.JoinKeys joinKeys, List<RexExpression> 
joinClauses, JoinRelType joinType) {
-    // TODO: Handle the case where _leftKeySelector and _rightKeySelector 
could be null.
+    Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(joinType),
+        "Join type: " + joinType + " is not supported!");
     _leftKeySelector = joinKeys.getLeftJoinKeySelector();
     _rightKeySelector = joinKeys.getRightJoinKeySelector();
+    Preconditions.checkState(_leftKeySelector != null, "LeftKeySelector for 
join cannot be null");
+    Preconditions.checkState(_rightKeySelector != null, "RightKeySelector for 
join cannot be null");
     _leftTableOperator = leftTableOperator;
     _rightTableOperator = rightTableOperator;
     _resultSchema = outputSchema;
@@ -98,19 +103,17 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    if (!_isHashTableBuilt) {
-      // Build JOIN hash table
-      buildBroadcastHashTable();
-    }
-
-    if (_upstreamErrorBlock != null) {
-      return _upstreamErrorBlock;
-    } else if (!_isHashTableBuilt) {
-      return TransferableBlockUtils.getNoOpTransferableBlock();
-    }
-
-    // JOIN each left block with the right block.
     try {
+      if (!_isHashTableBuilt) {
+        // Build JOIN hash table
+        buildBroadcastHashTable();
+      }
+      if (_upstreamErrorBlock != null) {
+        return _upstreamErrorBlock;
+      } else if (!_isHashTableBuilt) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+      // JOIN each left block with the right block.
       return buildJoinedDataBlock(_leftTableOperator.nextBlock());
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
@@ -145,30 +148,30 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     if (leftBlock.isErrorBlock()) {
       _upstreamErrorBlock = leftBlock;
       return _upstreamErrorBlock;
-    } else if (TransferableBlockUtils.isEndOfStream(leftBlock) || 
TransferableBlockUtils.isNoOpBlock(leftBlock)) {
+    } else if (TransferableBlockUtils.isNoOpBlock(leftBlock) || 
TransferableBlockUtils.isEndOfStream(leftBlock)) {
       return leftBlock;
     }
-
     List<Object[]> rows = new ArrayList<>();
-    List<Object[]> container = leftBlock.getContainer();
+    List<Object[]> container = leftBlock.isEndOfStreamBlock() ? new 
ArrayList<>() : leftBlock.getContainer();
     for (Object[] leftRow : container) {
-      List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
-          new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
+      // NOTE: Empty key selector will always give same hash code.
+      List<Object[]> hashCollection =
+          _broadcastHashTable.getOrDefault(new 
Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
       // If it is a left join and right table is empty, we return left rows.
       if (hashCollection.isEmpty() && _joinType == JoinRelType.LEFT) {
         rows.add(joinRow(leftRow, null));
       } else {
         // If it is other type of join.
         for (Object[] rightRow : hashCollection) {
+          // TODO: Optimize this to avoid unnecessary object copy.
           Object[] resultRow = joinRow(leftRow, rightRow);
-          if (_joinClauseEvaluators.isEmpty() || 
_joinClauseEvaluators.stream().allMatch(
-            evaluator -> evaluator.apply(resultRow))) {
+          if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream()
+              .allMatch(evaluator -> evaluator.apply(resultRow))) {
             rows.add(resultRow);
           }
         }
       }
     }
-
     return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
   }
 
@@ -178,7 +181,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     for (Object obj : leftRow) {
       resultRow[idx++] = obj;
     }
-    if (_joinType != JoinRelType.SEMI && rightRow != null) {
+    if (rightRow != null) {
       for (Object obj : rightRow) {
         resultRow[idx++] = obj;
       }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 2fe9900bb8..df30f1b617 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -45,16 +45,12 @@ import 
org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.roaringbitmap.RoaringBitmap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * {@code QueryDispatcher} dispatch a query to different workers.
  */
 public class QueryDispatcher {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryDispatcher.class);
-
   private final Map<String, DispatchClient> _dispatchClientMap = new 
ConcurrentHashMap<>();
 
   public QueryDispatcher() {
@@ -68,8 +64,8 @@ public class QueryDispatcher {
     // run reduce stage and return result.
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
queryPlan.getQueryStageMap().get(reduceStageId);
     MailboxReceiveOperator mailboxReceiveOperator = 
createReduceStageOperator(mailboxService,
-        
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
-        requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), 
mailboxService.getHostname(),
+        
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
 requestId,
+        reduceNode.getSenderStageId(), reduceNode.getDataSchema(), 
mailboxService.getHostname(),
         mailboxService.getMailboxPort());
     List<DataBlock> resultDataBlocks = 
reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
     return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
@@ -91,9 +87,8 @@ public class QueryDispatcher {
           int servicePort = serverInstance.getQueryServicePort();
           int mailboxPort = serverInstance.getQueryMailboxPort();
           DispatchClient client = getOrCreateDispatchClient(host, servicePort);
-          Worker.QueryResponse response = 
client.submit(Worker.QueryRequest.newBuilder()
-              
.setStagePlan(QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan,
 stageId,
-                  serverInstance)))
+          Worker.QueryResponse response = 
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
+                  
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, 
serverInstance)))
               .putMetadata("REQUEST_ID", String.valueOf(requestId))
               .putMetadata("SERVER_INSTANCE_HOST", 
serverInstance.getHostname())
               .putMetadata("SERVER_INSTANCE_PORT", 
String.valueOf(mailboxPort)).build());
@@ -132,8 +127,8 @@ public class QueryDispatcher {
       if (TransferableBlockUtils.isEndOfStream(transferableBlock) && 
transferableBlock.isErrorBlock()) {
         // TODO: we only received bubble up error from the execution stage 
tree.
         // TODO: query dispatch should also send cancel signal to the rest of 
the execution stage tree.
-          throw new RuntimeException("Received error query execution result 
block: "
-              + transferableBlock.getDataBlock().getExceptions());
+        throw new RuntimeException(
+            "Received error query execution result block: " + 
transferableBlock.getDataBlock().getExceptions());
       }
       if (transferableBlock.isNoOpBlock()) {
         continue;
@@ -154,7 +149,6 @@ public class QueryDispatcher {
     for (DataBlock dataBlock : queryResult) {
       int numColumns = resultSchema.getColumnNames().length;
       int numRows = dataBlock.getNumberOfRows();
-      DataSchema.ColumnDataType[] resultColumnDataTypes = 
resultSchema.getColumnDataTypes();
       List<Object[]> rows = new ArrayList<>(dataBlock.getNumberOfRows());
       if (numRows > 0) {
         RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 72f2c31f63..5434701974 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.Operator;
@@ -111,7 +110,7 @@ public class AggregateOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     Mockito.when(_input.nextBlock())
-        .thenReturn(block(inSchema, new Object[]{1, 1}))
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}))
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
@@ -135,7 +134,7 @@ public class AggregateOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     Mockito.when(_input.nextBlock())
-        .thenReturn(block(inSchema, new Object[]{2, 1}))
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
@@ -163,7 +162,7 @@ public class AggregateOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     Mockito.when(_input.nextBlock())
-        .thenReturn(block(inSchema, new Object[]{2, 3}))
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
@@ -192,8 +191,8 @@ public class AggregateOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     Mockito.when(_input.nextBlock())
-        .thenReturn(block(inSchema, new Object[]{1, 1}, new Object[]{1, 1}))
-        .thenReturn(block(inSchema, new Object[]{1, 1}))
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}, new 
Object[]{1, 1}))
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     AggregateOperator.Merger merger = 
Mockito.mock(AggregateOperator.Merger.class);
@@ -260,7 +259,7 @@ public class AggregateOperatorTest {
     Mockito.when(_input.nextBlock())
         // TODO: it is necessary to produce two values here, the operator only 
throws on second
         // (see the comment in Aggregate operator)
-        .thenReturn(block(inSchema, new Object[]{2, "foo"}, new Object[]{2, 
"foo"}))
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"}, 
new Object[]{2, "foo"}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
@@ -275,10 +274,6 @@ public class AggregateOperatorTest {
         "expected it to fail with class cast exception");
   }
 
-  private static TransferableBlock block(DataSchema schema, Object[]... rows) {
-    return new TransferableBlock(Arrays.asList(rows), schema, 
DataBlock.Type.ROW);
-  }
-
   private static RexExpression.FunctionCall getSum(RexExpression arg) {
     return new RexExpression.FunctionCall(SqlKind.SUM, FieldSpec.DataType.INT, 
"SUM", ImmutableList.of(arg));
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 0cf912c279..e88a2703bd 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -18,21 +18,51 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
 public class HashJoinOperatorTest {
+  private AutoCloseable _mocks;
+
+  @Mock
+  private Operator<TransferableBlock> _leftOperator;
+
+  @Mock
+  private Operator<TransferableBlock> _rightOperator;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
   private static JoinNode.JoinKeys getJoinKeys(List<Integer> leftIdx, 
List<Integer> rightIdx) {
     FieldSelectionKeySelector leftSelect = new 
FieldSelectionKeySelector(leftIdx);
     FieldSelectionKeySelector rightSelect = new 
FieldSelectionKeySelector(rightIdx);
@@ -40,84 +70,513 @@ public class HashJoinOperatorTest {
   }
 
   @Test
-  public void testHashJoinKeyCollisionInnerJoin() {
-    BaseOperator<TransferableBlock> leftOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
-    BaseOperator<TransferableBlock> rightOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+  public void shouldHandleHashJoinKeyCollisionInnerJoin() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
     List<RexExpression> joinClauses = new ArrayList<>();
-    DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", 
"bar"}, new DataSchema.ColumnDataType[]{
-        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
-        DataSchema.ColumnDataType.STRING
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_col2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.INNER);
+
+    TransferableBlock result = joinOnString.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = joinOnString.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows =
+        Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "BB", 2, 
"BB"}, new Object[]{2, "BB", 3, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+    Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
+  }
+
+  @Test
+  public void shouldHandleInnerJoinOnInt() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join =
-        new HashJoinOperator(leftOperator, rightOperator, resultSchema, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
-            joinClauses, JoinRelType.INNER);
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_col2", "string_co2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
JoinRelType.INNER);
+    TransferableBlock result = joinOnInt.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = joinOnInt.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{2, "BB", 2, 
"Aa"}, new Object[]{2, "BB", 2, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+  }
+
+  @Test
+  public void shouldHandleJoinOnEmptySelector() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_col2", "string_co2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, 
JoinRelType.INNER);
+    TransferableBlock result = joinOnInt.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = joinOnInt.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows =
+        Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{1, "Aa", 2, 
"BB"}, new Object[]{1, "Aa", 3, "BB"},
+            new Object[]{2, "BB", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"}, 
new Object[]{2, "BB", 3, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+    Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
+    Assert.assertEquals(resultRows.get(3), expectedRows.get(3));
+    Assert.assertEquals(resultRows.get(4), expectedRows.get(4));
+    Assert.assertEquals(resultRows.get(5), expectedRows.get(5));
+  }
+
+  @Test
+  public void shouldHandleLeftJoin() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "CC"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.LEFT);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
     }
     List<Object[]> resultRows = result.getContainer();
-    List<Object[]> expectedRows =
-        Arrays.asList(new Object[]{1, "Aa", 1, "Aa"}, new Object[]{2, "BB", 2, 
"BB"}, new Object[]{2, "BB", 3, "BB"},
-            new Object[]{3, "BB", 2, "BB"}, new Object[]{3, "BB", 3, "BB"});
-    Assert.assertEquals(expectedRows.size(), resultRows.size());
-    Assert.assertEquals(expectedRows.get(0), resultRows.get(0));
-    Assert.assertEquals(expectedRows.get(1), resultRows.get(1));
-    Assert.assertEquals(expectedRows.get(2), resultRows.get(2));
-    Assert.assertEquals(expectedRows.get(3), resultRows.get(3));
-    Assert.assertEquals(expectedRows.get(4), resultRows.get(4));
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, 
"Aa"}, new Object[]{2, "CC", null, null});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
   }
 
   @Test
-  public void testInnerJoin() {
-    BaseOperator<TransferableBlock> leftOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
-    BaseOperator<TransferableBlock> rightOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
+  public void shouldPassLeftTableEOS() {
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    
Mockito.when(_leftOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new 
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
     List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
JoinRelType.INNER);
 
-    DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", 
"bar"}, new DataSchema.ColumnDataType[]{
-        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
-        DataSchema.ColumnDataType.STRING
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
+    Assert.assertTrue(result.isEndOfStreamBlock());
+  }
+
+  @Test
+  public void shouldHandleLeftJoinOneToN() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join =
-        new HashJoinOperator(leftOperator, rightOperator, resultSchema, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
-            joinClauses, JoinRelType.INNER);
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    
Mockito.when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.block(leftSchema,
 new Object[]{1, "Aa"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new 
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
JoinRelType.LEFT);
+
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 1, 
"BB"}, new Object[]{1, "Aa", 1, "CC"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+  }
+
+  @Test
+  public void shouldPassRightTableEOS() {
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new 
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    
Mockito.when(_rightOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
JoinRelType.INNER);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
     }
     List<Object[]> resultRows = result.getContainer();
-    Object[] expRow = new Object[]{1, "Aa", 2, "Aa"};
-    List<Object[]> expectedRows = new ArrayList<>();
-    expectedRows.add(expRow);
-    Assert.assertEquals(expectedRows.size(), resultRows.size());
-    Assert.assertEquals(expectedRows.get(0), resultRows.get(0));
+    Assert.assertTrue(resultRows.isEmpty());
   }
 
   @Test
-  public void testLeftJoin() {
-    BaseOperator<TransferableBlock> leftOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
-    BaseOperator<TransferableBlock> rightOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
+  public void shouldHandleInequiJoinOnString() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    List<RexExpression> functionOperands = new ArrayList<>();
+    functionOperands.add(new RexExpression.InputRef(1));
+    functionOperands.add(new RexExpression.InputRef(3));
+    joinClauses.add(
+        new RexExpression.FunctionCall(SqlKind.NOT_EQUALS, 
FieldSpec.DataType.STRING, "NOT_EQUALS", functionOperands));
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, 
JoinRelType.INNER);
+
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
+    Assert.assertTrue(result.isErrorBlock());
+    MetadataBlock errorBlock = (MetadataBlock) result.getDataBlock();
+    
Assert.assertTrue(errorBlock.getExceptions().get(1000).matches(".*notEquals.*"));
+  }
+
+  @Test
+  public void shouldHandleInequiJoinOnInt() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, 
new Object[]{1, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    List<RexExpression> functionOperands = new ArrayList<>();
+    functionOperands.add(new RexExpression.InputRef(0));
+    functionOperands.add(new RexExpression.InputRef(2));
+    joinClauses.add(
+        new RexExpression.FunctionCall(SqlKind.NOT_EQUALS, 
FieldSpec.DataType.STRING, "NOT_EQUALS", functionOperands));
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, 
JoinRelType.INNER);
+
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, 
"Aa"}, new Object[]{2, "BB", 1, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*RIGHT is not supported"
+      + ".*")
+  public void shouldThrowOnRightJoin() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", 
"bar"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+        DataSchema.ColumnDataType.STRING
+    });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.RIGHT);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*SEMI is not "
+      + "supported.*")
+  public void shouldThrowOnSemiJoin() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", 
"bar"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+        DataSchema.ColumnDataType.STRING
+    });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.SEMI);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*FULL is not supported.*")
+  public void shouldThrowOnFullJoin() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     List<RexExpression> joinClauses = new ArrayList<>();
     DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", 
"bar"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join =
-        new HashJoinOperator(leftOperator, rightOperator, resultSchema, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
-            joinClauses, JoinRelType.LEFT);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.FULL);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*ANTI is not supported.*")
+  public void shouldThrowOnAntiJoin() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", 
"bar"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+        DataSchema.ColumnDataType.STRING
+    });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
JoinRelType.ANTI);
+  }
+
+  @Test
+  public void shouldPropagateRightTableError() {
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new 
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("testInnerJoinRightError")));
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
JoinRelType.INNER);
+
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
+    Assert.assertTrue(result.isErrorBlock());
+    
Assert.assertTrue(result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE)
+        .matches("testInnerJoinRightError"));
+  }
+
+  @Test
+  public void shouldPropagateLeftTableError() {
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new 
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("testInnerJoinLeftError")));
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
JoinRelType.INNER);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
     }
+    Assert.assertTrue(result.isErrorBlock());
+    Assert.assertTrue(
+        
result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).matches("testInnerJoinLeftError"));
+  }
+
+  @Test
+  public void shouldHandleNoOpBlock() {
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    
Mockito.when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.block(rightSchema,
 new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+        .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "CC"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    
Mockito.when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.block(rightSchema,
 new Object[]{1, "BB"}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+        .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
JoinRelType.INNER);
+
+    TransferableBlock result = join.nextBlock(); // first no-op consumes first 
right data block.
+    Assert.assertTrue(result.isNoOpBlock());
+    result = join.nextBlock(); // second no-op consumes no-op right block.
+    Assert.assertTrue(result.isNoOpBlock());
+    result = join.nextBlock(); // third no-op consumes another right data 
block.
+    Assert.assertTrue(result.isNoOpBlock());
+    result = join.nextBlock(); // forth no-op consumes another right data 
block.
+    Assert.assertTrue(result.isNoOpBlock());
+    result = join.nextBlock();   // build result using the first left block
     List<Object[]> resultRows = result.getContainer();
-    List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, 
"Aa"}, new Object[]{2, "BB", null, null},
-        new Object[]{3, "BB", null, null});
-    Assert.assertEquals(expectedRows.size(), resultRows.size());
-    Assert.assertEquals(expectedRows.get(0), resultRows.get(0));
-    Assert.assertEquals(expectedRows.get(1), resultRows.get(1));
-    Assert.assertEquals(expectedRows.get(2), resultRows.get(2));
+    List<Object[]> expectedRows = ImmutableList.of(new Object[]{2, "BB", 2, 
"Aa"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    result = join.nextBlock();     // second left block is no-op
+    Assert.assertTrue(result.isNoOpBlock());
+    result = join.nextBlock(); // third left block consumes some extra data
+    expectedRows = ImmutableList.of(new Object[]{2, "CC", 2, "Aa"});
+    resultRows = result.getContainer();
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    result = join.nextBlock(); // last one is EOS.
+    Assert.assertTrue(result.isEndOfStreamBlock());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to