This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5758aef1ce Optimize lookup table in join operator (#14972) 5758aef1ce is described below commit 5758aef1ce41124a97d4ba0f6467400978ca4dd2 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Feb 27 11:21:57 2025 -0700 Optimize lookup table in join operator (#14972) --- .../query/runtime/operator/BaseJoinOperator.java | 1 + .../query/runtime/operator/HashJoinOperator.java | 203 ++++++++++++++------- .../runtime/operator/join/DoubleLookupTable.java | 65 +++++++ .../runtime/operator/join/FloatLookupTable.java | 65 +++++++ .../runtime/operator/join/IntLookupTable.java | 65 +++++++ .../runtime/operator/join/LongLookupTable.java | 65 +++++++ .../query/runtime/operator/join/LookupTable.java | 100 ++++++++++ .../runtime/operator/join/ObjectLookupTable.java | 64 +++++++ 8 files changed, 557 insertions(+), 71 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java index 23d7c29710..3eee997c20 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java @@ -228,6 +228,7 @@ public abstract class BaseJoinOperator extends MultiStageOperator { protected abstract List<Object[]> buildNonMatchRightRows(); + // TODO: Optimize this to avoid unnecessary object copy. protected Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] rightRow) { Object[] resultRow = new Object[_resultColumnSize]; if (leftRow != null) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index b51fc216ec..3a42e5da3b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -31,8 +31,13 @@ import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.operator.join.DoubleLookupTable; +import org.apache.pinot.query.runtime.operator.join.FloatLookupTable; +import org.apache.pinot.query.runtime.operator.join.IntLookupTable; +import org.apache.pinot.query.runtime.operator.join.LongLookupTable; +import org.apache.pinot.query.runtime.operator.join.LookupTable; +import org.apache.pinot.query.runtime.operator.join.ObjectLookupTable; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.spi.utils.BooleanUtils; import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode; @@ -40,27 +45,50 @@ import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOver * This {@code HashJoinOperator} join algorithm with join keys. Right table is materialized into a hash table. */ // TODO: Support memory size based resource limit. +@SuppressWarnings("unchecked") public class HashJoinOperator extends BaseJoinOperator { private static final String EXPLAIN_NAME = "HASH_JOIN"; - private static final int INITIAL_HEURISTIC_SIZE = 16; + + // Placeholder for BitSet in _matchedRightRows when all keys are unique in the right table. + private static final BitSet BIT_SET_PLACEHOLDER = new BitSet(0); private final KeySelector<?> _leftKeySelector; private final KeySelector<?> _rightKeySelector; - private final Map<Object, ArrayList<Object[]>> _rightTable; + private final LookupTable _rightTable; // Track matched right rows for right join and full join to output non-matched right rows. // TODO: Revisit whether we should use IntList or RoaringBitmap for smaller memory footprint. + // TODO: Optimize this private final Map<Object, BitSet> _matchedRightRows; public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftInput, DataSchema leftSchema, MultiStageOperator rightInput, JoinNode node) { super(context, leftInput, leftSchema, rightInput, node); - Preconditions.checkState(!node.getLeftKeys().isEmpty(), "Hash join operator requires join keys"); - _leftKeySelector = KeySelectorFactory.getKeySelector(node.getLeftKeys()); + List<Integer> leftKeys = node.getLeftKeys(); + Preconditions.checkState(!leftKeys.isEmpty(), "Hash join operator requires join keys"); + _leftKeySelector = KeySelectorFactory.getKeySelector(leftKeys); _rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys()); - _rightTable = new HashMap<>(); + _rightTable = createLookupTable(leftKeys, leftSchema); _matchedRightRows = needUnmatchedRightRows() ? new HashMap<>() : null; } + private static LookupTable createLookupTable(List<Integer> joinKeys, DataSchema schema) { + if (joinKeys.size() > 1) { + return new ObjectLookupTable(); + } + switch (schema.getColumnDataType(joinKeys.get(0)).getStoredType()) { + case INT: + return new IntLookupTable(); + case LONG: + return new LongLookupTable(); + case FLOAT: + return new FloatLookupTable(); + case DOUBLE: + return new DoubleLookupTable(); + default: + return new ObjectLookupTable(); + } + } + @Override public String toExplainString() { return EXPLAIN_NAME; @@ -71,41 +99,35 @@ public class HashJoinOperator extends BaseJoinOperator { throws ProcessingException { LOGGER.trace("Building hash table for join operator"); long startTime = System.currentTimeMillis(); - int numRowsInHashTable = 0; + int numRows = 0; TransferableBlock rightBlock = _rightInput.nextBlock(); while (!TransferableBlockUtils.isEndOfStream(rightBlock)) { - List<Object[]> container = rightBlock.getContainer(); + List<Object[]> rows = rightBlock.getContainer(); // Row based overflow check. - if (container.size() + numRowsInHashTable > _maxRowsInJoin) { + if (rows.size() + numRows > _maxRowsInJoin) { if (_joinOverflowMode == JoinOverFlowMode.THROW) { throwProcessingExceptionForJoinRowLimitExceeded( "Cannot build in memory hash table for join operator, reached number of rows limit: " + _maxRowsInJoin); } else { // Just fill up the buffer. - int remainingRows = _maxRowsInJoin - numRowsInHashTable; - container = container.subList(0, remainingRows); + int remainingRows = _maxRowsInJoin - numRows; + rows = rows.subList(0, remainingRows); _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true); // setting only the rightTableOperator to be early terminated and awaits EOS block next. _rightInput.earlyTerminate(); } } - // put all the rows into corresponding hash collections keyed by the key selector function. - for (Object[] row : container) { - ArrayList<Object[]> hashCollection = - _rightTable.computeIfAbsent(_rightKeySelector.getKey(row), k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE)); - int size = hashCollection.size(); - if ((size & size - 1) == 0 && size < _maxRowsInJoin && size < Integer.MAX_VALUE / 2) { // is power of 2 - hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInJoin)); - } - hashCollection.add(row); + for (Object[] row : rows) { + _rightTable.addRow(_rightKeySelector.getKey(row), row); } - numRowsInHashTable += container.size(); + numRows += rows.size(); sampleAndCheckInterruption(); rightBlock = _rightInput.nextBlock(); } if (rightBlock.isErrorBlock()) { _upstreamErrorBlock = rightBlock; } else { + _rightTable.finish(); _isRightTableBuilt = true; _rightSideStats = rightBlock.getQueryStats(); assert _rightSideStats != null; @@ -123,69 +145,99 @@ public class HashJoinOperator extends BaseJoinOperator { case ANTI: return buildJoinedDataBlockAnti(leftBlock); default: { // INNER, LEFT, RIGHT, FULL - return buildJoinedDataBlockDefault(leftBlock); + if (_rightTable.isKeysUnique()) { + return buildJoinedDataBlockUniqueKeys(leftBlock); + } else { + return buildJoinedDataBlockDuplicateKeys(leftBlock); + } } } } - private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) + private List<Object[]> buildJoinedDataBlockUniqueKeys(TransferableBlock leftBlock) throws ProcessingException { - List<Object[]> container = leftBlock.getContainer(); - ArrayList<Object[]> rows = new ArrayList<>(container.size()); + List<Object[]> leftRows = leftBlock.getContainer(); + ArrayList<Object[]> rows = new ArrayList<>(leftRows.size()); - for (Object[] leftRow : container) { + for (Object[] leftRow : leftRows) { Object key = _leftKeySelector.getKey(leftRow); - // NOTE: Empty key selector will always give same hash code. - List<Object[]> rightRows = _rightTable.get(key); - if (rightRows == null) { - if (needUnmatchedLeftRows()) { - if (isMaxRowsLimitReached(rows.size())) { - break; - } - rows.add(joinRow(leftRow, null)); - } - continue; - } - boolean hasMatchForLeftRow = false; - int numRightRows = rightRows.size(); - rows.ensureCapacity(rows.size() + numRightRows); - boolean maxRowsLimitReached = false; - for (int i = 0; i < numRightRows; i++) { - Object[] rightRow = rightRows.get(i); - // TODO: Optimize this to avoid unnecessary object copy. + Object[] rightRow = (Object[]) _rightTable.lookup(key); + if (rightRow == null) { + handleUnmatchedLeftRow(leftRow, rows); + } else { Object[] resultRow = joinRow(leftRow, rightRow); - if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream() - .allMatch(evaluator -> BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) { + if (matchNonEquiConditions(resultRow)) { if (isMaxRowsLimitReached(rows.size())) { - maxRowsLimitReached = true; break; } rows.add(resultRow); - hasMatchForLeftRow = true; if (_matchedRightRows != null) { - _matchedRightRows.computeIfAbsent(key, k -> new BitSet(numRightRows)).set(i); + _matchedRightRows.put(key, BIT_SET_PLACEHOLDER); } + } else { + handleUnmatchedLeftRow(leftRow, rows); } } - if (maxRowsLimitReached) { - break; - } - if (!hasMatchForLeftRow && needUnmatchedLeftRows()) { - if (isMaxRowsLimitReached(rows.size())) { + } + + return rows; + } + + private List<Object[]> buildJoinedDataBlockDuplicateKeys(TransferableBlock leftBlock) + throws ProcessingException { + List<Object[]> leftRows = leftBlock.getContainer(); + List<Object[]> rows = new ArrayList<>(leftRows.size()); + + for (Object[] leftRow : leftRows) { + Object key = _leftKeySelector.getKey(leftRow); + List<Object[]> rightRows = (List<Object[]>) _rightTable.lookup(key); + if (rightRows == null) { + handleUnmatchedLeftRow(leftRow, rows); + } else { + boolean maxRowsLimitReached = false; + boolean hasMatchForLeftRow = false; + int numRightRows = rightRows.size(); + for (int i = 0; i < numRightRows; i++) { + Object[] resultRow = joinRow(leftRow, rightRows.get(i)); + if (matchNonEquiConditions(resultRow)) { + if (isMaxRowsLimitReached(rows.size())) { + maxRowsLimitReached = true; + break; + } + rows.add(resultRow); + hasMatchForLeftRow = true; + if (_matchedRightRows != null) { + _matchedRightRows.computeIfAbsent(key, k -> new BitSet(numRightRows)).set(i); + } + } + } + if (maxRowsLimitReached) { break; } - rows.add(joinRow(leftRow, null)); + if (!hasMatchForLeftRow) { + handleUnmatchedLeftRow(leftRow, rows); + } } } return rows; } + private void handleUnmatchedLeftRow(Object[] leftRow, List<Object[]> rows) + throws ProcessingException { + if (needUnmatchedLeftRows()) { + if (isMaxRowsLimitReached(rows.size())) { + return; + } + rows.add(joinRow(leftRow, null)); + } + } + private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) { - List<Object[]> container = leftBlock.getContainer(); - List<Object[]> rows = new ArrayList<>(container.size()); + List<Object[]> leftRows = leftBlock.getContainer(); + List<Object[]> rows = new ArrayList<>(leftRows.size()); - for (Object[] leftRow : container) { + for (Object[] leftRow : leftRows) { Object key = _leftKeySelector.getKey(leftRow); // SEMI-JOIN only checks existence of the key if (_rightTable.containsKey(key)) { @@ -197,10 +249,10 @@ public class HashJoinOperator extends BaseJoinOperator { } private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) { - List<Object[]> container = leftBlock.getContainer(); - List<Object[]> rows = new ArrayList<>(container.size()); + List<Object[]> leftRows = leftBlock.getContainer(); + List<Object[]> rows = new ArrayList<>(leftRows.size()); - for (Object[] leftRow : container) { + for (Object[] leftRow : leftRows) { Object key = _leftKeySelector.getKey(leftRow); // ANTI-JOIN only checks non-existence of the key if (!_rightTable.containsKey(key)) { @@ -214,18 +266,27 @@ public class HashJoinOperator extends BaseJoinOperator { @Override protected List<Object[]> buildNonMatchRightRows() { List<Object[]> rows = new ArrayList<>(); - for (Map.Entry<Object, ArrayList<Object[]>> entry : _rightTable.entrySet()) { - List<Object[]> rightRows = entry.getValue(); - BitSet matchedIndices = _matchedRightRows.get(entry.getKey()); - if (matchedIndices == null) { - for (Object[] rightRow : rightRows) { + if (_rightTable.isKeysUnique()) { + for (Map.Entry<Object, Object[]> entry : _rightTable.entrySet()) { + Object[] rightRow = entry.getValue(); + if (!_matchedRightRows.containsKey(entry.getKey())) { rows.add(joinRow(null, rightRow)); } - } else { - int numRightRows = rightRows.size(); - int unmatchedIndex = 0; - while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) { - rows.add(joinRow(null, rightRows.get(unmatchedIndex++))); + } + } else { + for (Map.Entry<Object, ArrayList<Object[]>> entry : _rightTable.entrySet()) { + List<Object[]> rightRows = entry.getValue(); + BitSet matchedIndices = _matchedRightRows.get(entry.getKey()); + if (matchedIndices == null) { + for (Object[] rightRow : rightRows) { + rows.add(joinRow(null, rightRow)); + } + } else { + int numRightRows = rightRows.size(); + int unmatchedIndex = 0; + while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) { + rows.add(joinRow(null, rightRows.get(unmatchedIndex++))); + } } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java new file mode 100644 index 0000000000..77c9266d39 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator.join; + +import it.unimi.dsi.fastutil.doubles.Double2ObjectMap; +import it.unimi.dsi.fastutil.doubles.Double2ObjectOpenHashMap; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + + +/** + * The {@code DoubleLookupTable} is a lookup table for double keys. + */ +@SuppressWarnings("unchecked") +public class DoubleLookupTable extends LookupTable { + private final Double2ObjectOpenHashMap<Object> _lookupTable = new Double2ObjectOpenHashMap<>(INITIAL_CAPACITY); + + @Override + public void addRow(Object key, Object[] row) { + _lookupTable.compute((double) key, (k, v) -> computeNewValue(row, v)); + } + + @Override + public void finish() { + if (!_keysUnique) { + for (Double2ObjectMap.Entry<Object> entry : _lookupTable.double2ObjectEntrySet()) { + convertValueToList(entry); + } + } + } + + @Override + public boolean containsKey(Object key) { + return _lookupTable.containsKey((double) key); + } + + @Nullable + @Override + public Object lookup(Object key) { + return _lookupTable.get((double) key); + } + + @SuppressWarnings("rawtypes") + @Override + public Set<Map.Entry> entrySet() { + return (Set) _lookupTable.double2ObjectEntrySet(); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java new file mode 100644 index 0000000000..437b3f8547 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator.join; + +import it.unimi.dsi.fastutil.floats.Float2ObjectMap; +import it.unimi.dsi.fastutil.floats.Float2ObjectOpenHashMap; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + + +/** + * The {@code FloatLookupTable} is a lookup table for float keys. + */ +@SuppressWarnings("unchecked") +public class FloatLookupTable extends LookupTable { + private final Float2ObjectOpenHashMap<Object> _lookupTable = new Float2ObjectOpenHashMap<>(INITIAL_CAPACITY); + + @Override + public void addRow(Object key, Object[] row) { + _lookupTable.compute((float) key, (k, v) -> computeNewValue(row, v)); + } + + @Override + public void finish() { + if (!_keysUnique) { + for (Float2ObjectMap.Entry<Object> entry : _lookupTable.float2ObjectEntrySet()) { + convertValueToList(entry); + } + } + } + + @Override + public boolean containsKey(Object key) { + return _lookupTable.containsKey((float) key); + } + + @Nullable + @Override + public Object lookup(Object key) { + return _lookupTable.get((float) key); + } + + @SuppressWarnings("rawtypes") + @Override + public Set<Map.Entry> entrySet() { + return (Set) _lookupTable.float2ObjectEntrySet(); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java new file mode 100644 index 0000000000..688192b6cc --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator.join; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + + +/** + * The {@code IntLookupTable} is a lookup table for int keys. + */ +@SuppressWarnings("unchecked") +public class IntLookupTable extends LookupTable { + private final Int2ObjectOpenHashMap<Object> _lookupTable = new Int2ObjectOpenHashMap<>(INITIAL_CAPACITY); + + @Override + public void addRow(Object key, Object[] row) { + _lookupTable.compute((int) key, (k, v) -> computeNewValue(row, v)); + } + + @Override + public void finish() { + if (!_keysUnique) { + for (Int2ObjectMap.Entry<Object> entry : _lookupTable.int2ObjectEntrySet()) { + convertValueToList(entry); + } + } + } + + @Override + public boolean containsKey(Object key) { + return _lookupTable.containsKey((int) key); + } + + @Nullable + @Override + public Object lookup(Object key) { + return _lookupTable.get((int) key); + } + + @SuppressWarnings("rawtypes") + @Override + public Set<Map.Entry> entrySet() { + return (Set) _lookupTable.int2ObjectEntrySet(); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java new file mode 100644 index 0000000000..5e393f4647 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator.join; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + + +/** + * The {@code LongLookupTable} is a lookup table for long keys. + */ +@SuppressWarnings("unchecked") +public class LongLookupTable extends LookupTable { + private final Long2ObjectOpenHashMap<Object> _lookupTable = new Long2ObjectOpenHashMap<>(INITIAL_CAPACITY); + + @Override + public void addRow(Object key, Object[] row) { + _lookupTable.compute((long) key, (k, v) -> computeNewValue(row, v)); + } + + @Override + public void finish() { + if (!_keysUnique) { + for (Long2ObjectMap.Entry<Object> entry : _lookupTable.long2ObjectEntrySet()) { + convertValueToList(entry); + } + } + } + + @Override + public boolean containsKey(Object key) { + return _lookupTable.containsKey((long) key); + } + + @Nullable + @Override + public Object lookup(Object key) { + return _lookupTable.get((long) key); + } + + @SuppressWarnings("rawtypes") + @Override + public Set<Map.Entry> entrySet() { + return (Set) _lookupTable.long2ObjectEntrySet(); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java new file mode 100644 index 0000000000..0b62092bbe --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator.join; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + + +public abstract class LookupTable { + // TODO: Make it configurable + protected static final int INITIAL_CAPACITY = 10000; + + protected boolean _keysUnique = true; + + /** + * Adds a row to the lookup table. + */ + public abstract void addRow(Object key, Object[] row); + + @SuppressWarnings("unchecked") + protected Object computeNewValue(Object[] row, @Nullable Object currentValue) { + if (currentValue == null) { + return row; + } else { + _keysUnique = false; + if (currentValue instanceof List) { + ((List<Object[]>) currentValue).add(row); + return currentValue; + } else { + List<Object[]> rows = new ArrayList<>(); + rows.add((Object[]) currentValue); + rows.add(row); + return rows; + } + } + } + + /** + * Finishes adding rows to the lookup table. This method should be called after all rows are added to the lookup + * table, and before looking up rows. + */ + public abstract void finish(); + + protected static void convertValueToList(Map.Entry<?, Object> entry) { + Object value = entry.getValue(); + if (value instanceof Object[]) { + entry.setValue(Collections.singletonList(value)); + } + } + + /** + * Returns {@code true} when all the keys added to the lookup table are unique. + * When all keys are unique, the value of the lookup table is a single row ({@code Object[]}). When keys are not + * unique, the value of the lookup table is a list of rows ({@code List<Object[]>}). + */ + public boolean isKeysUnique() { + return _keysUnique; + } + + /** + * Returns {@code true} if the lookup table contains the given key. + */ + public abstract boolean containsKey(Object key); + + /** + * Returns the row/rows for the given key. When {@link #isKeysUnique} returns {@code true}, this method returns a + * single row ({@code Object[]}). When {@link #isKeysUnique} returns {@code false}, this method returns a list of rows + * ({@code List<Object[]>}). Returns {@code null} if the key does not exist in the lookup table. + */ + @Nullable + public abstract Object lookup(Object key); + + /** + * Returns all the entries in the lookup table. When {@link #isKeysUnique} returns {@code true}, the value of the + * entries is a single row ({@code Object[]}). When {@link #isKeysUnique} returns {@code false}, the value of the + * entries is a list of rows ({@code List<Object[]>}). + */ + @SuppressWarnings("rawtypes") + public abstract Set<Map.Entry> entrySet(); +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java new file mode 100644 index 0000000000..f455b1a8c3 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator.join; + +import com.google.common.collect.Maps; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + + +/** + * The {@code DoubleLookupTable} is a lookup table for non-primitive keys. + */ +@SuppressWarnings("unchecked") +public class ObjectLookupTable extends LookupTable { + private final Map<Object, Object> _lookupTable = Maps.newHashMapWithExpectedSize(INITIAL_CAPACITY); + + @Override + public void addRow(Object key, Object[] row) { + _lookupTable.compute(key, (k, v) -> computeNewValue(row, v)); + } + + @Override + public void finish() { + if (!_keysUnique) { + for (Map.Entry<Object, Object> entry : _lookupTable.entrySet()) { + convertValueToList(entry); + } + } + } + + @Override + public boolean containsKey(Object key) { + return _lookupTable.containsKey(key); + } + + @Nullable + @Override + public Object lookup(Object key) { + return _lookupTable.get(key); + } + + @SuppressWarnings("rawtypes") + @Override + public Set<Map.Entry> entrySet() { + return (Set) _lookupTable.entrySet(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org