This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5758aef1ce Optimize lookup table in join operator (#14972)
5758aef1ce is described below

commit 5758aef1ce41124a97d4ba0f6467400978ca4dd2
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Feb 27 11:21:57 2025 -0700

    Optimize lookup table in join operator (#14972)
---
 .../query/runtime/operator/BaseJoinOperator.java   |   1 +
 .../query/runtime/operator/HashJoinOperator.java   | 203 ++++++++++++++-------
 .../runtime/operator/join/DoubleLookupTable.java   |  65 +++++++
 .../runtime/operator/join/FloatLookupTable.java    |  65 +++++++
 .../runtime/operator/join/IntLookupTable.java      |  65 +++++++
 .../runtime/operator/join/LongLookupTable.java     |  65 +++++++
 .../query/runtime/operator/join/LookupTable.java   | 100 ++++++++++
 .../runtime/operator/join/ObjectLookupTable.java   |  64 +++++++
 8 files changed, 557 insertions(+), 71 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index 23d7c29710..3eee997c20 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -228,6 +228,7 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
 
   protected abstract List<Object[]> buildNonMatchRightRows();
 
+  // TODO: Optimize this to avoid unnecessary object copy.
   protected Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] 
rightRow) {
     Object[] resultRow = new Object[_resultColumnSize];
     if (leftRow != null) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index b51fc216ec..3a42e5da3b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -31,8 +31,13 @@ import 
org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.join.DoubleLookupTable;
+import org.apache.pinot.query.runtime.operator.join.FloatLookupTable;
+import org.apache.pinot.query.runtime.operator.join.IntLookupTable;
+import org.apache.pinot.query.runtime.operator.join.LongLookupTable;
+import org.apache.pinot.query.runtime.operator.join.LookupTable;
+import org.apache.pinot.query.runtime.operator.join.ObjectLookupTable;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.utils.BooleanUtils;
 import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
 
 
@@ -40,27 +45,50 @@ import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOver
  * This {@code HashJoinOperator} join algorithm with join keys. Right table is 
materialized into a hash table.
  */
 // TODO: Support memory size based resource limit.
+@SuppressWarnings("unchecked")
 public class HashJoinOperator extends BaseJoinOperator {
   private static final String EXPLAIN_NAME = "HASH_JOIN";
-  private static final int INITIAL_HEURISTIC_SIZE = 16;
+
+  // Placeholder for BitSet in _matchedRightRows when all keys are unique in 
the right table.
+  private static final BitSet BIT_SET_PLACEHOLDER = new BitSet(0);
 
   private final KeySelector<?> _leftKeySelector;
   private final KeySelector<?> _rightKeySelector;
-  private final Map<Object, ArrayList<Object[]>> _rightTable;
+  private final LookupTable _rightTable;
   // Track matched right rows for right join and full join to output 
non-matched right rows.
   // TODO: Revisit whether we should use IntList or RoaringBitmap for smaller 
memory footprint.
+  // TODO: Optimize this
   private final Map<Object, BitSet> _matchedRightRows;
 
   public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator 
leftInput, DataSchema leftSchema,
       MultiStageOperator rightInput, JoinNode node) {
     super(context, leftInput, leftSchema, rightInput, node);
-    Preconditions.checkState(!node.getLeftKeys().isEmpty(), "Hash join 
operator requires join keys");
-    _leftKeySelector = KeySelectorFactory.getKeySelector(node.getLeftKeys());
+    List<Integer> leftKeys = node.getLeftKeys();
+    Preconditions.checkState(!leftKeys.isEmpty(), "Hash join operator requires 
join keys");
+    _leftKeySelector = KeySelectorFactory.getKeySelector(leftKeys);
     _rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys());
-    _rightTable = new HashMap<>();
+    _rightTable = createLookupTable(leftKeys, leftSchema);
     _matchedRightRows = needUnmatchedRightRows() ? new HashMap<>() : null;
   }
 
+  private static LookupTable createLookupTable(List<Integer> joinKeys, 
DataSchema schema) {
+    if (joinKeys.size() > 1) {
+      return new ObjectLookupTable();
+    }
+    switch (schema.getColumnDataType(joinKeys.get(0)).getStoredType()) {
+      case INT:
+        return new IntLookupTable();
+      case LONG:
+        return new LongLookupTable();
+      case FLOAT:
+        return new FloatLookupTable();
+      case DOUBLE:
+        return new DoubleLookupTable();
+      default:
+        return new ObjectLookupTable();
+    }
+  }
+
   @Override
   public String toExplainString() {
     return EXPLAIN_NAME;
@@ -71,41 +99,35 @@ public class HashJoinOperator extends BaseJoinOperator {
       throws ProcessingException {
     LOGGER.trace("Building hash table for join operator");
     long startTime = System.currentTimeMillis();
-    int numRowsInHashTable = 0;
+    int numRows = 0;
     TransferableBlock rightBlock = _rightInput.nextBlock();
     while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
-      List<Object[]> container = rightBlock.getContainer();
+      List<Object[]> rows = rightBlock.getContainer();
       // Row based overflow check.
-      if (container.size() + numRowsInHashTable > _maxRowsInJoin) {
+      if (rows.size() + numRows > _maxRowsInJoin) {
         if (_joinOverflowMode == JoinOverFlowMode.THROW) {
           throwProcessingExceptionForJoinRowLimitExceeded(
               "Cannot build in memory hash table for join operator, reached 
number of rows limit: " + _maxRowsInJoin);
         } else {
           // Just fill up the buffer.
-          int remainingRows = _maxRowsInJoin - numRowsInHashTable;
-          container = container.subList(0, remainingRows);
+          int remainingRows = _maxRowsInJoin - numRows;
+          rows = rows.subList(0, remainingRows);
           _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
           // setting only the rightTableOperator to be early terminated and 
awaits EOS block next.
           _rightInput.earlyTerminate();
         }
       }
-      // put all the rows into corresponding hash collections keyed by the key 
selector function.
-      for (Object[] row : container) {
-        ArrayList<Object[]> hashCollection =
-            _rightTable.computeIfAbsent(_rightKeySelector.getKey(row), k -> 
new ArrayList<>(INITIAL_HEURISTIC_SIZE));
-        int size = hashCollection.size();
-        if ((size & size - 1) == 0 && size < _maxRowsInJoin && size < 
Integer.MAX_VALUE / 2) { // is power of 2
-          hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInJoin));
-        }
-        hashCollection.add(row);
+      for (Object[] row : rows) {
+        _rightTable.addRow(_rightKeySelector.getKey(row), row);
       }
-      numRowsInHashTable += container.size();
+      numRows += rows.size();
       sampleAndCheckInterruption();
       rightBlock = _rightInput.nextBlock();
     }
     if (rightBlock.isErrorBlock()) {
       _upstreamErrorBlock = rightBlock;
     } else {
+      _rightTable.finish();
       _isRightTableBuilt = true;
       _rightSideStats = rightBlock.getQueryStats();
       assert _rightSideStats != null;
@@ -123,69 +145,99 @@ public class HashJoinOperator extends BaseJoinOperator {
       case ANTI:
         return buildJoinedDataBlockAnti(leftBlock);
       default: { // INNER, LEFT, RIGHT, FULL
-        return buildJoinedDataBlockDefault(leftBlock);
+        if (_rightTable.isKeysUnique()) {
+          return buildJoinedDataBlockUniqueKeys(leftBlock);
+        } else {
+          return buildJoinedDataBlockDuplicateKeys(leftBlock);
+        }
       }
     }
   }
 
-  private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock 
leftBlock)
+  private List<Object[]> buildJoinedDataBlockUniqueKeys(TransferableBlock 
leftBlock)
       throws ProcessingException {
-    List<Object[]> container = leftBlock.getContainer();
-    ArrayList<Object[]> rows = new ArrayList<>(container.size());
+    List<Object[]> leftRows = leftBlock.getContainer();
+    ArrayList<Object[]> rows = new ArrayList<>(leftRows.size());
 
-    for (Object[] leftRow : container) {
+    for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
-      // NOTE: Empty key selector will always give same hash code.
-      List<Object[]> rightRows = _rightTable.get(key);
-      if (rightRows == null) {
-        if (needUnmatchedLeftRows()) {
-          if (isMaxRowsLimitReached(rows.size())) {
-            break;
-          }
-          rows.add(joinRow(leftRow, null));
-        }
-        continue;
-      }
-      boolean hasMatchForLeftRow = false;
-      int numRightRows = rightRows.size();
-      rows.ensureCapacity(rows.size() + numRightRows);
-      boolean maxRowsLimitReached = false;
-      for (int i = 0; i < numRightRows; i++) {
-        Object[] rightRow = rightRows.get(i);
-        // TODO: Optimize this to avoid unnecessary object copy.
+      Object[] rightRow = (Object[]) _rightTable.lookup(key);
+      if (rightRow == null) {
+        handleUnmatchedLeftRow(leftRow, rows);
+      } else {
         Object[] resultRow = joinRow(leftRow, rightRow);
-        if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream()
-            .allMatch(evaluator -> 
BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) {
+        if (matchNonEquiConditions(resultRow)) {
           if (isMaxRowsLimitReached(rows.size())) {
-            maxRowsLimitReached = true;
             break;
           }
           rows.add(resultRow);
-          hasMatchForLeftRow = true;
           if (_matchedRightRows != null) {
-            _matchedRightRows.computeIfAbsent(key, k -> new 
BitSet(numRightRows)).set(i);
+            _matchedRightRows.put(key, BIT_SET_PLACEHOLDER);
           }
+        } else {
+          handleUnmatchedLeftRow(leftRow, rows);
         }
       }
-      if (maxRowsLimitReached) {
-        break;
-      }
-      if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
-        if (isMaxRowsLimitReached(rows.size())) {
+    }
+
+    return rows;
+  }
+
+  private List<Object[]> buildJoinedDataBlockDuplicateKeys(TransferableBlock 
leftBlock)
+      throws ProcessingException {
+    List<Object[]> leftRows = leftBlock.getContainer();
+    List<Object[]> rows = new ArrayList<>(leftRows.size());
+
+    for (Object[] leftRow : leftRows) {
+      Object key = _leftKeySelector.getKey(leftRow);
+      List<Object[]> rightRows = (List<Object[]>) _rightTable.lookup(key);
+      if (rightRows == null) {
+        handleUnmatchedLeftRow(leftRow, rows);
+      } else {
+        boolean maxRowsLimitReached = false;
+        boolean hasMatchForLeftRow = false;
+        int numRightRows = rightRows.size();
+        for (int i = 0; i < numRightRows; i++) {
+          Object[] resultRow = joinRow(leftRow, rightRows.get(i));
+          if (matchNonEquiConditions(resultRow)) {
+            if (isMaxRowsLimitReached(rows.size())) {
+              maxRowsLimitReached = true;
+              break;
+            }
+            rows.add(resultRow);
+            hasMatchForLeftRow = true;
+            if (_matchedRightRows != null) {
+              _matchedRightRows.computeIfAbsent(key, k -> new 
BitSet(numRightRows)).set(i);
+            }
+          }
+        }
+        if (maxRowsLimitReached) {
           break;
         }
-        rows.add(joinRow(leftRow, null));
+        if (!hasMatchForLeftRow) {
+          handleUnmatchedLeftRow(leftRow, rows);
+        }
       }
     }
 
     return rows;
   }
 
+  private void handleUnmatchedLeftRow(Object[] leftRow, List<Object[]> rows)
+      throws ProcessingException {
+    if (needUnmatchedLeftRows()) {
+      if (isMaxRowsLimitReached(rows.size())) {
+        return;
+      }
+      rows.add(joinRow(leftRow, null));
+    }
+  }
+
   private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) 
{
-    List<Object[]> container = leftBlock.getContainer();
-    List<Object[]> rows = new ArrayList<>(container.size());
+    List<Object[]> leftRows = leftBlock.getContainer();
+    List<Object[]> rows = new ArrayList<>(leftRows.size());
 
-    for (Object[] leftRow : container) {
+    for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
       // SEMI-JOIN only checks existence of the key
       if (_rightTable.containsKey(key)) {
@@ -197,10 +249,10 @@ public class HashJoinOperator extends BaseJoinOperator {
   }
 
   private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) 
{
-    List<Object[]> container = leftBlock.getContainer();
-    List<Object[]> rows = new ArrayList<>(container.size());
+    List<Object[]> leftRows = leftBlock.getContainer();
+    List<Object[]> rows = new ArrayList<>(leftRows.size());
 
-    for (Object[] leftRow : container) {
+    for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
       // ANTI-JOIN only checks non-existence of the key
       if (!_rightTable.containsKey(key)) {
@@ -214,18 +266,27 @@ public class HashJoinOperator extends BaseJoinOperator {
   @Override
   protected List<Object[]> buildNonMatchRightRows() {
     List<Object[]> rows = new ArrayList<>();
-    for (Map.Entry<Object, ArrayList<Object[]>> entry : 
_rightTable.entrySet()) {
-      List<Object[]> rightRows = entry.getValue();
-      BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
-      if (matchedIndices == null) {
-        for (Object[] rightRow : rightRows) {
+    if (_rightTable.isKeysUnique()) {
+      for (Map.Entry<Object, Object[]> entry : _rightTable.entrySet()) {
+        Object[] rightRow = entry.getValue();
+        if (!_matchedRightRows.containsKey(entry.getKey())) {
           rows.add(joinRow(null, rightRow));
         }
-      } else {
-        int numRightRows = rightRows.size();
-        int unmatchedIndex = 0;
-        while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex)) 
< numRightRows) {
-          rows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
+      }
+    } else {
+      for (Map.Entry<Object, ArrayList<Object[]>> entry : 
_rightTable.entrySet()) {
+        List<Object[]> rightRows = entry.getValue();
+        BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
+        if (matchedIndices == null) {
+          for (Object[] rightRow : rightRows) {
+            rows.add(joinRow(null, rightRow));
+          }
+        } else {
+          int numRightRows = rightRows.size();
+          int unmatchedIndex = 0;
+          while ((unmatchedIndex = 
matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) {
+            rows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
+          }
         }
       }
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java
new file mode 100644
index 0000000000..77c9266d39
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import it.unimi.dsi.fastutil.doubles.Double2ObjectMap;
+import it.unimi.dsi.fastutil.doubles.Double2ObjectOpenHashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code DoubleLookupTable} is a lookup table for double keys.
+ */
+@SuppressWarnings("unchecked")
+public class DoubleLookupTable extends LookupTable {
+  private final Double2ObjectOpenHashMap<Object> _lookupTable = new 
Double2ObjectOpenHashMap<>(INITIAL_CAPACITY);
+
+  @Override
+  public void addRow(Object key, Object[] row) {
+    _lookupTable.compute((double) key, (k, v) -> computeNewValue(row, v));
+  }
+
+  @Override
+  public void finish() {
+    if (!_keysUnique) {
+      for (Double2ObjectMap.Entry<Object> entry : 
_lookupTable.double2ObjectEntrySet()) {
+        convertValueToList(entry);
+      }
+    }
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return _lookupTable.containsKey((double) key);
+  }
+
+  @Nullable
+  @Override
+  public Object lookup(Object key) {
+    return _lookupTable.get((double) key);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Set<Map.Entry> entrySet() {
+    return (Set) _lookupTable.double2ObjectEntrySet();
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java
new file mode 100644
index 0000000000..437b3f8547
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import it.unimi.dsi.fastutil.floats.Float2ObjectMap;
+import it.unimi.dsi.fastutil.floats.Float2ObjectOpenHashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code FloatLookupTable} is a lookup table for float keys.
+ */
+@SuppressWarnings("unchecked")
+public class FloatLookupTable extends LookupTable {
+  private final Float2ObjectOpenHashMap<Object> _lookupTable = new 
Float2ObjectOpenHashMap<>(INITIAL_CAPACITY);
+
+  @Override
+  public void addRow(Object key, Object[] row) {
+    _lookupTable.compute((float) key, (k, v) -> computeNewValue(row, v));
+  }
+
+  @Override
+  public void finish() {
+    if (!_keysUnique) {
+      for (Float2ObjectMap.Entry<Object> entry : 
_lookupTable.float2ObjectEntrySet()) {
+        convertValueToList(entry);
+      }
+    }
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return _lookupTable.containsKey((float) key);
+  }
+
+  @Nullable
+  @Override
+  public Object lookup(Object key) {
+    return _lookupTable.get((float) key);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Set<Map.Entry> entrySet() {
+    return (Set) _lookupTable.float2ObjectEntrySet();
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java
new file mode 100644
index 0000000000..688192b6cc
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code IntLookupTable} is a lookup table for int keys.
+ */
+@SuppressWarnings("unchecked")
+public class IntLookupTable extends LookupTable {
+  private final Int2ObjectOpenHashMap<Object> _lookupTable = new 
Int2ObjectOpenHashMap<>(INITIAL_CAPACITY);
+
+  @Override
+  public void addRow(Object key, Object[] row) {
+    _lookupTable.compute((int) key, (k, v) -> computeNewValue(row, v));
+  }
+
+  @Override
+  public void finish() {
+    if (!_keysUnique) {
+      for (Int2ObjectMap.Entry<Object> entry : 
_lookupTable.int2ObjectEntrySet()) {
+        convertValueToList(entry);
+      }
+    }
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return _lookupTable.containsKey((int) key);
+  }
+
+  @Nullable
+  @Override
+  public Object lookup(Object key) {
+    return _lookupTable.get((int) key);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Set<Map.Entry> entrySet() {
+    return (Set) _lookupTable.int2ObjectEntrySet();
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java
new file mode 100644
index 0000000000..5e393f4647
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code LongLookupTable} is a lookup table for long keys.
+ */
+@SuppressWarnings("unchecked")
+public class LongLookupTable extends LookupTable {
+  private final Long2ObjectOpenHashMap<Object> _lookupTable = new 
Long2ObjectOpenHashMap<>(INITIAL_CAPACITY);
+
+  @Override
+  public void addRow(Object key, Object[] row) {
+    _lookupTable.compute((long) key, (k, v) -> computeNewValue(row, v));
+  }
+
+  @Override
+  public void finish() {
+    if (!_keysUnique) {
+      for (Long2ObjectMap.Entry<Object> entry : 
_lookupTable.long2ObjectEntrySet()) {
+        convertValueToList(entry);
+      }
+    }
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return _lookupTable.containsKey((long) key);
+  }
+
+  @Nullable
+  @Override
+  public Object lookup(Object key) {
+    return _lookupTable.get((long) key);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Set<Map.Entry> entrySet() {
+    return (Set) _lookupTable.long2ObjectEntrySet();
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java
new file mode 100644
index 0000000000..0b62092bbe
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+public abstract class LookupTable {
+  // TODO: Make it configurable
+  protected static final int INITIAL_CAPACITY = 10000;
+
+  protected boolean _keysUnique = true;
+
+  /**
+   * Adds a row to the lookup table.
+   */
+  public abstract void addRow(Object key, Object[] row);
+
+  @SuppressWarnings("unchecked")
+  protected Object computeNewValue(Object[] row, @Nullable Object 
currentValue) {
+    if (currentValue == null) {
+      return row;
+    } else {
+      _keysUnique = false;
+      if (currentValue instanceof List) {
+        ((List<Object[]>) currentValue).add(row);
+        return currentValue;
+      } else {
+        List<Object[]> rows = new ArrayList<>();
+        rows.add((Object[]) currentValue);
+        rows.add(row);
+        return rows;
+      }
+    }
+  }
+
+  /**
+   * Finishes adding rows to the lookup table. This method should be called 
after all rows are added to the lookup
+   * table, and before looking up rows.
+   */
+  public abstract void finish();
+
+  protected static void convertValueToList(Map.Entry<?, Object> entry) {
+    Object value = entry.getValue();
+    if (value instanceof Object[]) {
+      entry.setValue(Collections.singletonList(value));
+    }
+  }
+
+  /**
+   * Returns {@code true} when all the keys added to the lookup table are 
unique.
+   * When all keys are unique, the value of the lookup table is a single row 
({@code Object[]}). When keys are not
+   * unique, the value of the lookup table is a list of rows ({@code 
List<Object[]>}).
+   */
+  public boolean isKeysUnique() {
+    return _keysUnique;
+  }
+
+  /**
+   * Returns {@code true} if the lookup table contains the given key.
+   */
+  public abstract boolean containsKey(Object key);
+
+  /**
+   * Returns the row/rows for the given key. When {@link #isKeysUnique} 
returns {@code true}, this method returns a
+   * single row ({@code Object[]}). When {@link #isKeysUnique} returns {@code 
false}, this method returns a list of rows
+   * ({@code List<Object[]>}). Returns {@code null} if the key does not exist 
in the lookup table.
+   */
+  @Nullable
+  public abstract Object lookup(Object key);
+
+  /**
+   * Returns all the entries in the lookup table. When {@link #isKeysUnique} 
returns {@code true}, the value of the
+   * entries is a single row ({@code Object[]}). When {@link #isKeysUnique} 
returns {@code false}, the value of the
+   * entries is a list of rows ({@code List<Object[]>}).
+   */
+  @SuppressWarnings("rawtypes")
+  public abstract Set<Map.Entry> entrySet();
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
new file mode 100644
index 0000000000..f455b1a8c3
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code DoubleLookupTable} is a lookup table for non-primitive keys.
+ */
+@SuppressWarnings("unchecked")
+public class ObjectLookupTable extends LookupTable {
+  private final Map<Object, Object> _lookupTable = 
Maps.newHashMapWithExpectedSize(INITIAL_CAPACITY);
+
+  @Override
+  public void addRow(Object key, Object[] row) {
+    _lookupTable.compute(key, (k, v) -> computeNewValue(row, v));
+  }
+
+  @Override
+  public void finish() {
+    if (!_keysUnique) {
+      for (Map.Entry<Object, Object> entry : _lookupTable.entrySet()) {
+        convertValueToList(entry);
+      }
+    }
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return _lookupTable.containsKey(key);
+  }
+
+  @Nullable
+  @Override
+  public Object lookup(Object key) {
+    return _lookupTable.get(key);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Set<Map.Entry> entrySet() {
+    return (Set) _lookupTable.entrySet();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to