This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 11602800d9 adding multi-column index support (#8937)
11602800d9 is described below

commit 11602800d9d38c8738f356e808dacc4c74823cd2
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Jun 28 14:39:55 2022 -0700

    adding multi-column index support (#8937)
    
    also support multi-column JOIN
    
    also fix hash distribution rule
    
    fix checkstyle
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/query/planner/PlannerUtils.java   | 35 ++++++++++++++++++
 .../query/planner/logical/RelToStageConverter.java | 24 +++++-------
 .../partitioning/FieldSelectionKeySelector.java    | 43 ++++++++++++++++++----
 .../query/planner/partitioning/KeySelector.java    |  2 +
 .../apache/pinot/query/planner/stage/JoinNode.java |  8 ++--
 .../pinot/query/planner/stage/MailboxSendNode.java |  6 +--
 .../query/rules/PinotExchangeNodeInsertRule.java   | 13 +++----
 .../pinot/query/QueryEnvironmentTestBase.java      |  1 +
 .../query/runtime/operator/HashJoinOperator.java   | 11 +++---
 .../runtime/operator/MailboxSendOperator.java      | 17 +++------
 .../pinot/query/runtime/QueryRunnerTest.java       |  4 ++
 11 files changed, 111 insertions(+), 53 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
index 43ab79d6bc..d1710a486b 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
@@ -18,6 +18,16 @@
  */
 package org.apache.pinot.query.planner;
 
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+
+
 /**
  * Utilities used by planner.
  */
@@ -26,6 +36,31 @@ public class PlannerUtils {
     // do not instantiate.
   }
 
+  public static List<List<Integer>> parseJoinConditions(RexCall joinCondition, 
int leftNodeOffset) {
+    switch (joinCondition.getOperator().getKind()) {
+      case EQUALS:
+        RexNode left = joinCondition.getOperands().get(0);
+        RexNode right = joinCondition.getOperands().get(1);
+        Preconditions.checkState(left instanceof RexInputRef, "only reference 
supported");
+        Preconditions.checkState(right instanceof RexInputRef, "only reference 
supported");
+        return Arrays.asList(Collections.singletonList(((RexInputRef) 
left).getIndex()),
+            Collections.singletonList(((RexInputRef) right).getIndex() - 
leftNodeOffset));
+      case AND:
+        List<List<Integer>> predicateColumns = new ArrayList<>(2);
+        predicateColumns.add(new ArrayList<>());
+        predicateColumns.add(new ArrayList<>());
+        for (RexNode operand : joinCondition.getOperands()) {
+          Preconditions.checkState(operand instanceof RexCall);
+          List<List<Integer>> subPredicate = parseJoinConditions((RexCall) 
operand, leftNodeOffset);
+          predicateColumns.get(0).addAll(subPredicate.get(0));
+          predicateColumns.get(1).addAll(subPredicate.get(1));
+        }
+        return predicateColumns;
+      default:
+        throw new UnsupportedOperationException("Only equality JOIN conditions 
are supported.");
+    }
+  }
+
   public static boolean isRootStage(int stageId) {
     return stageId == 0;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 297dce68e5..bc6d7dc4ca 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -28,11 +28,9 @@ import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalTableScan;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
@@ -89,19 +87,15 @@ public final class RelToStageConverter {
 
   private static StageNode convertLogicalJoin(LogicalJoin node, int 
currentStageId) {
     JoinRelType joinType = node.getJoinType();
+    Preconditions.checkState(node.getCondition() instanceof RexCall);
     RexCall joinCondition = (RexCall) node.getCondition();
-    Preconditions.checkState(
-        joinCondition.getOperator().getKind().equals(SqlKind.EQUALS) && 
joinCondition.getOperands().size() == 2,
-        "only equality JOIN is supported");
-    Preconditions.checkState(joinCondition.getOperands().get(0) instanceof 
RexInputRef, "only reference supported");
-    Preconditions.checkState(joinCondition.getOperands().get(1) instanceof 
RexInputRef, "only reference supported");
-    RelDataType leftRowType = node.getLeft().getRowType();
-    RelDataType rightRowType = node.getRight().getRowType();
-    int leftOperandIndex = ((RexInputRef) 
joinCondition.getOperands().get(0)).getIndex();
-    int rightOperandIndex = ((RexInputRef) 
joinCondition.getOperands().get(1)).getIndex();
-    FieldSelectionKeySelector leftFieldSelectionKeySelector = new 
FieldSelectionKeySelector(leftOperandIndex);
-    FieldSelectionKeySelector rightFieldSelectionKeySelector =
-          new FieldSelectionKeySelector(rightOperandIndex - 
leftRowType.getFieldNames().size());
+
+    // Parse out all equality JOIN conditions
+    int leftNodeOffset = node.getLeft().getRowType().getFieldList().size();
+    List<List<Integer>> predicateColumns = 
PlannerUtils.parseJoinConditions(joinCondition, leftNodeOffset);
+
+    FieldSelectionKeySelector leftFieldSelectionKeySelector = new 
FieldSelectionKeySelector(predicateColumns.get(0));
+    FieldSelectionKeySelector rightFieldSelectionKeySelector = new 
FieldSelectionKeySelector(predicateColumns.get(1));
     return new JoinNode(currentStageId, joinType, 
Collections.singletonList(new JoinNode.JoinClause(
         leftFieldSelectionKeySelector, rightFieldSelectionKeySelector)));
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
index 14f263c44f..674cc8e2a2 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
@@ -18,30 +18,59 @@
  */
 package org.apache.pinot.query.planner.partitioning;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
 /**
  * The {@code FieldSelectionKeySelector} simply extract a column value out 
from a row array {@link Object[]}.
  */
-public class FieldSelectionKeySelector implements KeySelector<Object[], 
Object> {
+public class FieldSelectionKeySelector implements KeySelector<Object[], 
Object[]> {
 
   @ProtoProperties
-  private int _columnIndex;
+  private List<Integer> _columnIndices;
 
   public FieldSelectionKeySelector() {
   }
 
   public FieldSelectionKeySelector(int columnIndex) {
-    _columnIndex = columnIndex;
+    _columnIndices = Collections.singletonList(columnIndex);
   }
 
-  public int getColumnIndex() {
-    return _columnIndex;
+  public FieldSelectionKeySelector(List<Integer> columnIndices) {
+    _columnIndices = new ArrayList<>();
+    _columnIndices.addAll(columnIndices);
+  }
+
+  public FieldSelectionKeySelector(int... columnIndices) {
+    _columnIndices = new ArrayList<>();
+    for (int columnIndex : columnIndices) {
+      _columnIndices.add(columnIndex);
+    }
+  }
+
+  public List<Integer> getColumnIndices() {
+    return _columnIndices;
+  }
+
+  @Override
+  public Object[] getKey(Object[] input) {
+    Object[] key = new Object[_columnIndices.size()];
+    for (int i = 0; i < _columnIndices.size(); i++) {
+      key[i] = input[_columnIndices.get(i)];
+    }
+    return key;
   }
 
   @Override
-  public Object getKey(Object[] input) {
-    return input[_columnIndex];
+  public int computeHash(Object[] input) {
+    HashCodeBuilder hashCodeBuilder = new HashCodeBuilder();
+    for (int columnIndex : _columnIndices) {
+      hashCodeBuilder.append(input[columnIndex]);
+    }
+    return hashCodeBuilder.toHashCode();
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
index e6b6e598a2..ea96e4b1d6 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
@@ -35,4 +35,6 @@ public interface KeySelector<IN, OUT> {
    * @return the key of the input data.
    */
   OUT getKey(IN input);
+
+  int computeHash(IN input);
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
index 223f0addee..0f9e007871 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
@@ -51,9 +51,9 @@ public class JoinNode extends AbstractStageNode {
 
   public static class JoinClause {
     @ProtoProperties
-    private KeySelector<Object[], Object> _leftJoinKeySelector;
+    private KeySelector<Object[], Object[]> _leftJoinKeySelector;
     @ProtoProperties
-    private KeySelector<Object[], Object> _rightJoinKeySelector;
+    private KeySelector<Object[], Object[]> _rightJoinKeySelector;
 
     public JoinClause() {
     }
@@ -63,11 +63,11 @@ public class JoinNode extends AbstractStageNode {
       _rightJoinKeySelector = rightKeySelector;
     }
 
-    public KeySelector<Object[], Object> getLeftJoinKeySelector() {
+    public KeySelector<Object[], Object[]> getLeftJoinKeySelector() {
       return _leftJoinKeySelector;
     }
 
-    public KeySelector<Object[], Object> getRightJoinKeySelector() {
+    public KeySelector<Object[], Object[]> getRightJoinKeySelector() {
       return _rightJoinKeySelector;
     }
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index 2d7eb816f7..1400b61f82 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -30,7 +30,7 @@ public class MailboxSendNode extends AbstractStageNode {
   @ProtoProperties
   private RelDistribution.Type _exchangeType;
   @ProtoProperties
-  private KeySelector<Object[], Object> _partitionKeySelector;
+  private KeySelector<Object[], Object[]> _partitionKeySelector;
 
   public MailboxSendNode(int stageId) {
     super(stageId);
@@ -43,7 +43,7 @@ public class MailboxSendNode extends AbstractStageNode {
   }
 
   public MailboxSendNode(int stageId, int receiverStageId,
-      RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], 
Object> partitionKeySelector) {
+      RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], 
Object[]> partitionKeySelector) {
     super(stageId);
     _receiverStageId = receiverStageId;
     _exchangeType = exchangeType;
@@ -58,7 +58,7 @@ public class MailboxSendNode extends AbstractStageNode {
     return _exchangeType;
   }
 
-  public KeySelector<Object[], Object> getPartitionKeySelector() {
+  public KeySelector<Object[], Object[]> getPartitionKeySelector() {
     return _partitionKeySelector;
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
index e7ef083ded..2a1b740669 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.rules;
 
 import com.google.common.collect.ImmutableList;
-import java.util.Collections;
 import java.util.List;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -33,8 +32,8 @@ import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.hints.PinotRelationalHints;
 
 
@@ -72,13 +71,13 @@ public class PinotExchangeNodeInsertRule extends RelOptRule 
{
     RelNode rightExchange;
     List<RelHint> hints = join.getHints();
     if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
-      int leftOperandIndex = ((RexInputRef) ((RexCall) 
join.getCondition()).getOperands().get(0)).getIndex();
-      int rightOperandIndex = ((RexInputRef) ((RexCall) 
join.getCondition()).getOperands().get(1)).getIndex()
-          - join.getLeft().getRowType().getFieldNames().size();
+      RexCall joinCondition = (RexCall) join.getCondition();
+      int leftNodeOffset = join.getLeft().getRowType().getFieldNames().size();
+      List<List<Integer>> conditions = 
PlannerUtils.parseJoinConditions(joinCondition, leftNodeOffset);
       leftExchange = LogicalExchange.create(leftInput,
-          RelDistributions.hash(Collections.singletonList(leftOperandIndex)));
+          RelDistributions.hash(conditions.get(0)));
       rightExchange = LogicalExchange.create(rightInput,
-          RelDistributions.hash(Collections.singletonList(rightOperandIndex)));
+          RelDistributions.hash(conditions.get(1)));
     } else { // if (hints.contains(PinotRelationalHints.USE_BROADCAST_JOIN))
       leftExchange = LogicalExchange.create(leftInput, 
RelDistributions.SINGLETON);
       rightExchange = LogicalExchange.create(rightInput, 
RelDistributions.BROADCAST_DISTRIBUTED);
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 40841d7c72..176bf3edc8 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -45,6 +45,7 @@ public class QueryEnvironmentTestBase {
     return new Object[][] {
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 
>= 0"},
+        new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = 
b.col2"},
         new Object[]{"SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = 
b.col2 "
             + "WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"},
     };
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 839b3a901c..5c5ac00c0f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -44,7 +44,7 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 public class HashJoinOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
 
-  private final HashMap<Object, List<Object[]>> _broadcastHashTable;
+  private final HashMap<Integer, List<Object[]>> _broadcastHashTable;
   private final BaseOperator<TransferableBlock> _leftTableOperator;
   private final BaseOperator<TransferableBlock> _rightTableOperator;
 
@@ -52,12 +52,11 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   private DataSchema _rightTableSchema;
   private int _resultRowSize;
   private boolean _isHashTableBuilt;
-  private KeySelector<Object[], Object> _leftKeySelector;
-  private KeySelector<Object[], Object> _rightKeySelector;
+  private KeySelector<Object[], Object[]> _leftKeySelector;
+  private KeySelector<Object[], Object[]> _rightKeySelector;
 
   public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator,
       BaseOperator<TransferableBlock> rightTableOperator, 
List<JoinNode.JoinClause> criteria) {
-    // TODO: this assumes right table is broadcast.
     _leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
     _rightKeySelector = criteria.get(0).getRightJoinKeySelector();
     _leftTableOperator = leftTableOperator;
@@ -97,7 +96,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
         // put all the rows into corresponding hash collections keyed by the 
key selector function.
         for (Object[] row : container) {
           List<Object[]> hashCollection =
-              
_broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(row), k -> new 
ArrayList<>());
+              
_broadcastHashTable.computeIfAbsent(_rightKeySelector.computeHash(row), k -> 
new ArrayList<>());
           hashCollection.add(row);
         }
         rightBlock = _rightTableOperator.nextBlock();
@@ -117,7 +116,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     List<Object[]> container = leftBlock.getContainer();
     for (Object[] leftRow : container) {
       List<Object[]> hashCollection =
-          _broadcastHashTable.getOrDefault(_leftKeySelector.getKey(leftRow), 
Collections.emptyList());
+          
_broadcastHashTable.getOrDefault(_leftKeySelector.computeHash(leftRow), 
Collections.emptyList());
       for (Object[] rightRow : hashCollection) {
         rows.add(joinRow(leftRow, rightRow));
       }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index cacf73572c..632c617d4e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -58,7 +58,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
 
   private final List<ServerInstance> _receivingStageInstances;
   private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object> _keySelector;
+  private final KeySelector<Object[], Object[]> _keySelector;
   private final String _serverHostName;
   private final int _serverPort;
   private final long _jobId;
@@ -69,7 +69,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
 
   public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService,
       BaseOperator<TransferableBlock> dataTableBlockBaseOperator, 
List<ServerInstance> receivingStageInstances,
-      RelDistribution.Type exchangeType, KeySelector<Object[], Object> 
keySelector, String hostName, int port,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector, String hostName, int port,
       long jobId, int stageId) {
     _mailboxService = mailboxService;
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -91,7 +91,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
    */
   public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService, BaseDataBlock dataTable,
       List<ServerInstance> receivingStageInstances, RelDistribution.Type 
exchangeType,
-      KeySelector<Object[], Object> keySelector, String hostName, int port, 
long jobId, int stageId) {
+      KeySelector<Object[], Object[]> keySelector, String hostName, int port, 
long jobId, int stageId) {
     _mailboxService = mailboxService;
     _dataTable = dataTable;
     _receivingStageInstances = receivingStageInstances;
@@ -169,7 +169,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   private static List<BaseDataBlock> constructPartitionedDataBlock(DataTable 
dataTable,
-      KeySelector<Object[], Object> keySelector, int partitionSize)
+      KeySelector<Object[], Object[]> keySelector, int partitionSize)
       throws Exception {
     List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize);
     for (int i = 0; i < partitionSize; i++) {
@@ -177,9 +177,8 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
     }
     for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) {
       Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, 
rowId);
-      Object key = keySelector.getKey(row);
-      // TODO: support other partitioning algorithm
-      temporaryRows.get(hashToIndex(key, partitionSize)).add(row);
+      int partitionId = keySelector.computeHash(row) % partitionSize;
+      temporaryRows.get(partitionId).add(row);
     }
     List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
     for (int i = 0; i < partitionSize; i++) {
@@ -189,10 +188,6 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
     return dataTableList;
   }
 
-  private static int hashToIndex(Object key, int partitionSize) {
-    return (key.hashCode()) % partitionSize;
-  }
-
   private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock 
dataTable, boolean isEndOfStream)
       throws IOException {
     String mailboxId = toMailboxId(serverInstance);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 4ef6b763f0..6ae43ce0cd 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -159,6 +159,10 @@ public class QueryRunnerTest {
         // thus the final JOIN result will be 15 x 1 = 15.
         new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col2", 15},
 
+        // Specifically table A has 15 rows (10 on server1 and 5 on server2) 
and table B has 5 rows (all on server1),
+        // thus the final JOIN result will be 15 x 1 = 15.
+        new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = 
b.col2", 15},
+
         // Specifically table A has 15 rows (10 on server1 and 5 on server2) 
and table B has 5 rows (all on server1),
         // but only 1 out of 5 rows from table A will be selected out; and all 
in table B will be selected.
         // thus the final JOIN result will be 1 x 3 x 1 = 3.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to