siddharthteotia commented on a change in pull request #6559: URL: https://github.com/apache/incubator-pinot/pull/6559#discussion_r573305843
########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java ########## @@ -61,8 +61,16 @@ * bounded by the number of groups limit (globalGroupIdUpperBound is always smaller or equal to numGroupsLimit). */ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { - private final static int INITIAL_MAP_SIZE = 256; - private final static int MAX_CACHING_MAP_SIZE = 1048576; + // NOTE: map size = map capacity (power of 2) * load factor + private static final int INITIAL_MAP_SIZE = (int) ((1 << 10) * 0.75f); + private static final int MAX_CACHING_MAP_SIZE = (int) ((1 << 20) * 0.75f); + + private static final ThreadLocal<IntGroupIdMap> THREAD_LOCAL_INT_MAP = ThreadLocal.withInitial(IntGroupIdMap::new); + private static final ThreadLocal<Long2IntOpenHashMap> THREAD_LOCAL_LONG_MAP = Review comment: The Long2IntOpenHashMap can also be reimplemented in a different format for better locality and cache friendliness right. Similar to how this PR implements IntGroupIdMap ? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java ########## @@ -108,35 +116,35 @@ public DictionaryBasedGroupKeyGenerator(TransformOperator transformOperator, Exp _isSingleValueColumn[i] = transformOperator.getResultMetadata(groupByExpression).isSingleValue(); } + // TODO: Clear the holder after processing the query instead of before if (longOverflow) { + // ArrayMapBasedHolder _globalGroupIdUpperBound = numGroupsLimit; - Object mapInternal = mapBasedRawKeyHolders.computeIfAbsent(ArrayMapBasedHolder.class.getName(), - o -> new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); - _rawKeyHolder = new ArrayMapBasedHolder(mapInternal); - if (((Object2IntOpenHashMap) mapInternal).size() > MAX_CACHING_MAP_SIZE) { - mapBasedRawKeyHolders - .put(ArrayMapBasedHolder.class.getName(), new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); + Object2IntOpenHashMap<IntArray> groupIdMap = THREAD_LOCAL_INT_ARRAY_MAP.get(); + int size = groupIdMap.size(); + groupIdMap.clear(); + if (size > MAX_CACHING_MAP_SIZE) { + groupIdMap.trim(); } + _rawKeyHolder = new ArrayMapBasedHolder(groupIdMap); } else { if (cardinalityProduct > Integer.MAX_VALUE) { + // LongMapBasedHolder _globalGroupIdUpperBound = numGroupsLimit; - Object mapInternal = mapBasedRawKeyHolders.computeIfAbsent(LongMapBasedHolder.class.getName(), - o -> new LongMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); - _rawKeyHolder = new LongMapBasedHolder(mapInternal); - if (((Long2IntOpenHashMap) mapInternal).size() > MAX_CACHING_MAP_SIZE) { - mapBasedRawKeyHolders - .put(ArrayMapBasedHolder.class.getName(), new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); + Long2IntOpenHashMap groupIdMap = THREAD_LOCAL_LONG_MAP.get(); + int size = groupIdMap.size(); + groupIdMap.clear(); + if (size > MAX_CACHING_MAP_SIZE) { + groupIdMap.trim(); } + _rawKeyHolder = new LongMapBasedHolder(groupIdMap); } else { _globalGroupIdUpperBound = Math.min((int) cardinalityProduct, numGroupsLimit); if (cardinalityProduct > arrayBasedThreshold) { - Object mapInternal = mapBasedRawKeyHolders.computeIfAbsent(IntMapBasedHolder.class.getName(), - o -> new IntMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); - _rawKeyHolder = new IntMapBasedHolder(mapInternal); - if (((Int2IntOpenHashMap) mapInternal).size() > MAX_CACHING_MAP_SIZE) { - mapBasedRawKeyHolders - .put(ArrayMapBasedHolder.class.getName(), new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); - } + // IntMapBasedHolder + IntGroupIdMap groupIdMap = THREAD_LOCAL_INT_MAP.get(); + groupIdMap.clear(); Review comment: What is the difference between trim() and clear() ? Can we use the same API? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java ########## @@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) { return groupKeyBuilder.toString(); } + /** + * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return value. + * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap}, this map uses one single array to store + * keys and values to reduce the cache miss. + */ + @VisibleForTesting + public static class IntGroupIdMap { + private static final float LOAD_FACTOR = 0.75f; + + private int[] _keyValueHolder; + private int _capacity; + private int _mask; + private int _maxNumEntries; + private int _size; + + public IntGroupIdMap() { + _capacity = 1 << 10; + int holderSize = _capacity << 1; Review comment: So looks like there are 2048 slots in the keyValueHolder array and we are going to use the lower order 12 bits of the incoming key for hash computation to get to the index/slot. And we will resize when 750 slots are full. Right ? How did we come up with these values of 1024, 2048 etc ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java ########## @@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) { return groupKeyBuilder.toString(); } + /** + * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return value. + * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap}, this map uses one single array to store + * keys and values to reduce the cache miss. + */ + @VisibleForTesting + public static class IntGroupIdMap { + private static final float LOAD_FACTOR = 0.75f; + + private int[] _keyValueHolder; + private int _capacity; + private int _mask; + private int _maxNumEntries; + private int _size; + + public IntGroupIdMap() { + _capacity = 1 << 10; + int holderSize = _capacity << 1; + _keyValueHolder = new int[holderSize]; + _mask = holderSize - 1; + _maxNumEntries = (int) (_capacity * LOAD_FACTOR); + } + + public int size() { + return _size; + } + + /** + * Returns the group id for the given raw key. Create a new group id if the raw key does not exist and the group id + * upper bound is not reached. + */ + public int getGroupId(int rawKey, int groupIdUpperBound) { + // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the internal key because rawKey can never be -1. + int internalKey = rawKey + 1; + int index = (HashCommon.mix(internalKey) << 1) & _mask; + int key = _keyValueHolder[index]; + + // Handle hash hit separately for better performance + if (key == internalKey) { + return _keyValueHolder[index + 1]; + } + if (key == 0) { + return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : INVALID_ID; Review comment: So `key == 0` implies we hashed to the empty array slot right ? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java ########## @@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) { return groupKeyBuilder.toString(); } + /** + * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return value. + * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap}, this map uses one single array to store + * keys and values to reduce the cache miss. + */ + @VisibleForTesting + public static class IntGroupIdMap { + private static final float LOAD_FACTOR = 0.75f; + + private int[] _keyValueHolder; + private int _capacity; + private int _mask; + private int _maxNumEntries; + private int _size; + + public IntGroupIdMap() { + _capacity = 1 << 10; + int holderSize = _capacity << 1; + _keyValueHolder = new int[holderSize]; + _mask = holderSize - 1; + _maxNumEntries = (int) (_capacity * LOAD_FACTOR); + } + + public int size() { + return _size; + } + + /** + * Returns the group id for the given raw key. Create a new group id if the raw key does not exist and the group id + * upper bound is not reached. + */ + public int getGroupId(int rawKey, int groupIdUpperBound) { + // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the internal key because rawKey can never be -1. + int internalKey = rawKey + 1; + int index = (HashCommon.mix(internalKey) << 1) & _mask; + int key = _keyValueHolder[index]; + + // Handle hash hit separately for better performance + if (key == internalKey) { + return _keyValueHolder[index + 1]; + } + if (key == 0) { + return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : INVALID_ID; + } + + // Hash collision + while (true) { + index = (index + 2) & _mask; + key = _keyValueHolder[index]; + if (key == internalKey) { + return _keyValueHolder[index + 1]; + } + if (key == 0) { + return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : INVALID_ID; + } + } + } + + private int addNewGroup(int internalKey, int index) { + int groupId = _size++; + _keyValueHolder[index] = internalKey; + _keyValueHolder[index + 1] = groupId; + if (_size > _maxNumEntries) { + expand(); + } + return groupId; + } + + private void expand() { + _capacity <<= 1; + int holderSize = _capacity << 1; + int[] oldKeyValueHolder = _keyValueHolder; + _keyValueHolder = new int[holderSize]; + _mask = holderSize - 1; + _maxNumEntries <<= 1; + int oldIndex = 0; + for (int i = 0; i < _size; i++) { + while (oldKeyValueHolder[oldIndex] == 0) { + oldIndex += 2; + } + int key = oldKeyValueHolder[oldIndex]; + int value = oldKeyValueHolder[oldIndex + 1]; + int newIndex = (HashCommon.mix(key) << 1) & _mask; + if (_keyValueHolder[newIndex] != 0) { + do { + newIndex = (newIndex + 2) & _mask; + } while (_keyValueHolder[newIndex] != 0); + } + _keyValueHolder[newIndex] = key; + _keyValueHolder[newIndex + 1] = value; + oldIndex += 2; + } + } + + public Iterator<Entry> iterator() { + return new Iterator<Entry>() { + private final Entry _entry = new Entry(); + private int _index; + private int _numRemainingEntries = _size; + + @Override + public boolean hasNext() { + return _numRemainingEntries > 0; + } + + @Override + public Entry next() { + int key; + while ((key = _keyValueHolder[_index]) == 0) { + _index += 2; + } + _entry._rawKey = key - 1; + _entry._groupId = _keyValueHolder[_index + 1]; + _index += 2; + _numRemainingEntries--; + return _entry; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** Review comment: There is no trimming here like the way it happens in combiner where we sort and trim for. We are just zeroing out the keyValueHolder array and allocating a new array if size > MAX_CACHING_MAP_SIZE ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java ########## @@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) { return groupKeyBuilder.toString(); } + /** + * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return value. + * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap}, this map uses one single array to store + * keys and values to reduce the cache miss. + */ + @VisibleForTesting + public static class IntGroupIdMap { + private static final float LOAD_FACTOR = 0.75f; + + private int[] _keyValueHolder; + private int _capacity; + private int _mask; + private int _maxNumEntries; + private int _size; + + public IntGroupIdMap() { + _capacity = 1 << 10; + int holderSize = _capacity << 1; + _keyValueHolder = new int[holderSize]; + _mask = holderSize - 1; + _maxNumEntries = (int) (_capacity * LOAD_FACTOR); + } + + public int size() { + return _size; + } + + /** + * Returns the group id for the given raw key. Create a new group id if the raw key does not exist and the group id + * upper bound is not reached. + */ + public int getGroupId(int rawKey, int groupIdUpperBound) { + // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the internal key because rawKey can never be -1. + int internalKey = rawKey + 1; + int index = (HashCommon.mix(internalKey) << 1) & _mask; + int key = _keyValueHolder[index]; + + // Handle hash hit separately for better performance + if (key == internalKey) { + return _keyValueHolder[index + 1]; + } + if (key == 0) { + return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : INVALID_ID; + } + + // Hash collision + while (true) { + index = (index + 2) & _mask; + key = _keyValueHolder[index]; + if (key == internalKey) { + return _keyValueHolder[index + 1]; + } + if (key == 0) { + return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : INVALID_ID; + } + } + } + + private int addNewGroup(int internalKey, int index) { + int groupId = _size++; + _keyValueHolder[index] = internalKey; + _keyValueHolder[index + 1] = groupId; + if (_size > _maxNumEntries) { + expand(); + } + return groupId; + } + + private void expand() { + _capacity <<= 1; + int holderSize = _capacity << 1; + int[] oldKeyValueHolder = _keyValueHolder; + _keyValueHolder = new int[holderSize]; + _mask = holderSize - 1; + _maxNumEntries <<= 1; + int oldIndex = 0; + for (int i = 0; i < _size; i++) { + while (oldKeyValueHolder[oldIndex] == 0) { + oldIndex += 2; + } + int key = oldKeyValueHolder[oldIndex]; + int value = oldKeyValueHolder[oldIndex + 1]; + int newIndex = (HashCommon.mix(key) << 1) & _mask; + if (_keyValueHolder[newIndex] != 0) { + do { + newIndex = (newIndex + 2) & _mask; + } while (_keyValueHolder[newIndex] != 0); + } + _keyValueHolder[newIndex] = key; + _keyValueHolder[newIndex + 1] = value; + oldIndex += 2; + } + } + + public Iterator<Entry> iterator() { + return new Iterator<Entry>() { + private final Entry _entry = new Entry(); + private int _index; + private int _numRemainingEntries = _size; + + @Override + public boolean hasNext() { + return _numRemainingEntries > 0; + } + + @Override + public Entry next() { + int key; + while ((key = _keyValueHolder[_index]) == 0) { + _index += 2; + } + _entry._rawKey = key - 1; + _entry._groupId = _keyValueHolder[_index + 1]; + _index += 2; + _numRemainingEntries--; + return _entry; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** + * Clears the map and trims the map if the size is larger than the {@link #MAX_CACHING_MAP_SIZE}. + */ + public void clear() { + if (_size == 0) { + return; + } + if (_size <= MAX_CACHING_MAP_SIZE) { + Arrays.fill(_keyValueHolder, 0); + } else { + _capacity = 1 << 10; Review comment: (nit) this is same as the initialization in the constructor. consider creating an init() function and moving this code in that function. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org