siddharthteotia commented on a change in pull request #6559:
URL: https://github.com/apache/incubator-pinot/pull/6559#discussion_r573305843



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -61,8 +61,16 @@
  * bounded by the number of groups limit (globalGroupIdUpperBound is always 
smaller or equal to numGroupsLimit).
  */
 public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
-  private final static int INITIAL_MAP_SIZE = 256;
-  private final static int MAX_CACHING_MAP_SIZE = 1048576;
+  // NOTE: map size = map capacity (power of 2) * load factor
+  private static final int INITIAL_MAP_SIZE = (int) ((1 << 10) * 0.75f);
+  private static final int MAX_CACHING_MAP_SIZE = (int) ((1 << 20) * 0.75f);
+
+  private static final ThreadLocal<IntGroupIdMap> THREAD_LOCAL_INT_MAP = 
ThreadLocal.withInitial(IntGroupIdMap::new);
+  private static final ThreadLocal<Long2IntOpenHashMap> THREAD_LOCAL_LONG_MAP =

Review comment:
       The Long2IntOpenHashMap can also be reimplemented in a different format 
for better locality and cache friendliness right. Similar to how this PR 
implements IntGroupIdMap ?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -108,35 +116,35 @@ public DictionaryBasedGroupKeyGenerator(TransformOperator 
transformOperator, Exp
 
       _isSingleValueColumn[i] = 
transformOperator.getResultMetadata(groupByExpression).isSingleValue();
     }
+    // TODO: Clear the holder after processing the query instead of before
     if (longOverflow) {
+      // ArrayMapBasedHolder
       _globalGroupIdUpperBound = numGroupsLimit;
-      Object mapInternal = 
mapBasedRawKeyHolders.computeIfAbsent(ArrayMapBasedHolder.class.getName(),
-          o -> new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
-      _rawKeyHolder = new ArrayMapBasedHolder(mapInternal);
-      if (((Object2IntOpenHashMap) mapInternal).size() > MAX_CACHING_MAP_SIZE) 
{
-        mapBasedRawKeyHolders
-            .put(ArrayMapBasedHolder.class.getName(), new 
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+      Object2IntOpenHashMap<IntArray> groupIdMap = 
THREAD_LOCAL_INT_ARRAY_MAP.get();
+      int size = groupIdMap.size();
+      groupIdMap.clear();
+      if (size > MAX_CACHING_MAP_SIZE) {
+        groupIdMap.trim();
       }
+      _rawKeyHolder = new ArrayMapBasedHolder(groupIdMap);
     } else {
       if (cardinalityProduct > Integer.MAX_VALUE) {
+        // LongMapBasedHolder
         _globalGroupIdUpperBound = numGroupsLimit;
-        Object mapInternal = 
mapBasedRawKeyHolders.computeIfAbsent(LongMapBasedHolder.class.getName(),
-            o -> new LongMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
-        _rawKeyHolder = new LongMapBasedHolder(mapInternal);
-        if (((Long2IntOpenHashMap) mapInternal).size() > MAX_CACHING_MAP_SIZE) 
{
-          mapBasedRawKeyHolders
-              .put(ArrayMapBasedHolder.class.getName(), new 
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+        Long2IntOpenHashMap groupIdMap = THREAD_LOCAL_LONG_MAP.get();
+        int size = groupIdMap.size();
+        groupIdMap.clear();
+        if (size > MAX_CACHING_MAP_SIZE) {
+          groupIdMap.trim();
         }
+        _rawKeyHolder = new LongMapBasedHolder(groupIdMap);
       } else {
         _globalGroupIdUpperBound = Math.min((int) cardinalityProduct, 
numGroupsLimit);
         if (cardinalityProduct > arrayBasedThreshold) {
-          Object mapInternal = 
mapBasedRawKeyHolders.computeIfAbsent(IntMapBasedHolder.class.getName(),
-              o -> new IntMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
-          _rawKeyHolder = new IntMapBasedHolder(mapInternal);
-          if (((Int2IntOpenHashMap) mapInternal).size() > 
MAX_CACHING_MAP_SIZE) {
-            mapBasedRawKeyHolders
-                .put(ArrayMapBasedHolder.class.getName(), new 
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
-          }
+          // IntMapBasedHolder
+          IntGroupIdMap groupIdMap = THREAD_LOCAL_INT_MAP.get();
+          groupIdMap.clear();

Review comment:
       What is the difference between trim() and clear() ? Can we use the same 
API?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) {
     return groupKeyBuilder.toString();
   }
 
+  /**
+   * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return 
value.
+   * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap}, 
this map uses one single array to store
+   * keys and values to reduce the cache miss.
+   */
+  @VisibleForTesting
+  public static class IntGroupIdMap {
+    private static final float LOAD_FACTOR = 0.75f;
+
+    private int[] _keyValueHolder;
+    private int _capacity;
+    private int _mask;
+    private int _maxNumEntries;
+    private int _size;
+
+    public IntGroupIdMap() {
+      _capacity = 1 << 10;
+      int holderSize = _capacity << 1;

Review comment:
       So looks like there are 2048 slots in the keyValueHolder array and we 
are going to use the lower order 12 bits of the  incoming key for hash 
computation to get to the index/slot. And we will resize when 750 slots are 
full. Right ? How did we come up with these values of 1024, 2048 etc

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) {
     return groupKeyBuilder.toString();
   }
 
+  /**
+   * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return 
value.
+   * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap}, 
this map uses one single array to store
+   * keys and values to reduce the cache miss.
+   */
+  @VisibleForTesting
+  public static class IntGroupIdMap {
+    private static final float LOAD_FACTOR = 0.75f;
+
+    private int[] _keyValueHolder;
+    private int _capacity;
+    private int _mask;
+    private int _maxNumEntries;
+    private int _size;
+
+    public IntGroupIdMap() {
+      _capacity = 1 << 10;
+      int holderSize = _capacity << 1;
+      _keyValueHolder = new int[holderSize];
+      _mask = holderSize - 1;
+      _maxNumEntries = (int) (_capacity * LOAD_FACTOR);
+    }
+
+    public int size() {
+      return _size;
+    }
+
+    /**
+     * Returns the group id for the given raw key. Create a new group id if 
the raw key does not exist and the group id
+     * upper bound is not reached.
+     */
+    public int getGroupId(int rawKey, int groupIdUpperBound) {
+      // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the 
internal key because rawKey can never be -1.
+      int internalKey = rawKey + 1;
+      int index = (HashCommon.mix(internalKey) << 1) & _mask;
+      int key = _keyValueHolder[index];
+
+      // Handle hash hit separately for better performance
+      if (key == internalKey) {
+        return _keyValueHolder[index + 1];
+      }
+      if (key == 0) {
+        return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : 
INVALID_ID;

Review comment:
       So `key == 0` implies we hashed to the empty array slot right ?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) {
     return groupKeyBuilder.toString();
   }
 
+  /**
+   * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return 
value.
+   * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap}, 
this map uses one single array to store
+   * keys and values to reduce the cache miss.
+   */
+  @VisibleForTesting
+  public static class IntGroupIdMap {
+    private static final float LOAD_FACTOR = 0.75f;
+
+    private int[] _keyValueHolder;
+    private int _capacity;
+    private int _mask;
+    private int _maxNumEntries;
+    private int _size;
+
+    public IntGroupIdMap() {
+      _capacity = 1 << 10;
+      int holderSize = _capacity << 1;
+      _keyValueHolder = new int[holderSize];
+      _mask = holderSize - 1;
+      _maxNumEntries = (int) (_capacity * LOAD_FACTOR);
+    }
+
+    public int size() {
+      return _size;
+    }
+
+    /**
+     * Returns the group id for the given raw key. Create a new group id if 
the raw key does not exist and the group id
+     * upper bound is not reached.
+     */
+    public int getGroupId(int rawKey, int groupIdUpperBound) {
+      // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the 
internal key because rawKey can never be -1.
+      int internalKey = rawKey + 1;
+      int index = (HashCommon.mix(internalKey) << 1) & _mask;
+      int key = _keyValueHolder[index];
+
+      // Handle hash hit separately for better performance
+      if (key == internalKey) {
+        return _keyValueHolder[index + 1];
+      }
+      if (key == 0) {
+        return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : 
INVALID_ID;
+      }
+
+      // Hash collision
+      while (true) {
+        index = (index + 2) & _mask;
+        key = _keyValueHolder[index];
+        if (key == internalKey) {
+          return _keyValueHolder[index + 1];
+        }
+        if (key == 0) {
+          return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : 
INVALID_ID;
+        }
+      }
+    }
+
+    private int addNewGroup(int internalKey, int index) {
+      int groupId = _size++;
+      _keyValueHolder[index] = internalKey;
+      _keyValueHolder[index + 1] = groupId;
+      if (_size > _maxNumEntries) {
+        expand();
+      }
+      return groupId;
+    }
+
+    private void expand() {
+      _capacity <<= 1;
+      int holderSize = _capacity << 1;
+      int[] oldKeyValueHolder = _keyValueHolder;
+      _keyValueHolder = new int[holderSize];
+      _mask = holderSize - 1;
+      _maxNumEntries <<= 1;
+      int oldIndex = 0;
+      for (int i = 0; i < _size; i++) {
+        while (oldKeyValueHolder[oldIndex] == 0) {
+          oldIndex += 2;
+        }
+        int key = oldKeyValueHolder[oldIndex];
+        int value = oldKeyValueHolder[oldIndex + 1];
+        int newIndex = (HashCommon.mix(key) << 1) & _mask;
+        if (_keyValueHolder[newIndex] != 0) {
+          do {
+            newIndex = (newIndex + 2) & _mask;
+          } while (_keyValueHolder[newIndex] != 0);
+        }
+        _keyValueHolder[newIndex] = key;
+        _keyValueHolder[newIndex + 1] = value;
+        oldIndex += 2;
+      }
+    }
+
+    public Iterator<Entry> iterator() {
+      return new Iterator<Entry>() {
+        private final Entry _entry = new Entry();
+        private int _index;
+        private int _numRemainingEntries = _size;
+
+        @Override
+        public boolean hasNext() {
+          return _numRemainingEntries > 0;
+        }
+
+        @Override
+        public Entry next() {
+          int key;
+          while ((key = _keyValueHolder[_index]) == 0) {
+            _index += 2;
+          }
+          _entry._rawKey = key - 1;
+          _entry._groupId = _keyValueHolder[_index + 1];
+          _index += 2;
+          _numRemainingEntries--;
+          return _entry;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    }
+
+    /**

Review comment:
       There is no trimming here like the way it happens in combiner where we 
sort and trim for. We are just zeroing out the keyValueHolder array and 
allocating a new array if size > MAX_CACHING_MAP_SIZE

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) {
     return groupKeyBuilder.toString();
   }
 
+  /**
+   * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return 
value.
+   * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap}, 
this map uses one single array to store
+   * keys and values to reduce the cache miss.
+   */
+  @VisibleForTesting
+  public static class IntGroupIdMap {
+    private static final float LOAD_FACTOR = 0.75f;
+
+    private int[] _keyValueHolder;
+    private int _capacity;
+    private int _mask;
+    private int _maxNumEntries;
+    private int _size;
+
+    public IntGroupIdMap() {
+      _capacity = 1 << 10;
+      int holderSize = _capacity << 1;
+      _keyValueHolder = new int[holderSize];
+      _mask = holderSize - 1;
+      _maxNumEntries = (int) (_capacity * LOAD_FACTOR);
+    }
+
+    public int size() {
+      return _size;
+    }
+
+    /**
+     * Returns the group id for the given raw key. Create a new group id if 
the raw key does not exist and the group id
+     * upper bound is not reached.
+     */
+    public int getGroupId(int rawKey, int groupIdUpperBound) {
+      // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the 
internal key because rawKey can never be -1.
+      int internalKey = rawKey + 1;
+      int index = (HashCommon.mix(internalKey) << 1) & _mask;
+      int key = _keyValueHolder[index];
+
+      // Handle hash hit separately for better performance
+      if (key == internalKey) {
+        return _keyValueHolder[index + 1];
+      }
+      if (key == 0) {
+        return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : 
INVALID_ID;
+      }
+
+      // Hash collision
+      while (true) {
+        index = (index + 2) & _mask;
+        key = _keyValueHolder[index];
+        if (key == internalKey) {
+          return _keyValueHolder[index + 1];
+        }
+        if (key == 0) {
+          return _size < groupIdUpperBound ? addNewGroup(internalKey, index) : 
INVALID_ID;
+        }
+      }
+    }
+
+    private int addNewGroup(int internalKey, int index) {
+      int groupId = _size++;
+      _keyValueHolder[index] = internalKey;
+      _keyValueHolder[index + 1] = groupId;
+      if (_size > _maxNumEntries) {
+        expand();
+      }
+      return groupId;
+    }
+
+    private void expand() {
+      _capacity <<= 1;
+      int holderSize = _capacity << 1;
+      int[] oldKeyValueHolder = _keyValueHolder;
+      _keyValueHolder = new int[holderSize];
+      _mask = holderSize - 1;
+      _maxNumEntries <<= 1;
+      int oldIndex = 0;
+      for (int i = 0; i < _size; i++) {
+        while (oldKeyValueHolder[oldIndex] == 0) {
+          oldIndex += 2;
+        }
+        int key = oldKeyValueHolder[oldIndex];
+        int value = oldKeyValueHolder[oldIndex + 1];
+        int newIndex = (HashCommon.mix(key) << 1) & _mask;
+        if (_keyValueHolder[newIndex] != 0) {
+          do {
+            newIndex = (newIndex + 2) & _mask;
+          } while (_keyValueHolder[newIndex] != 0);
+        }
+        _keyValueHolder[newIndex] = key;
+        _keyValueHolder[newIndex + 1] = value;
+        oldIndex += 2;
+      }
+    }
+
+    public Iterator<Entry> iterator() {
+      return new Iterator<Entry>() {
+        private final Entry _entry = new Entry();
+        private int _index;
+        private int _numRemainingEntries = _size;
+
+        @Override
+        public boolean hasNext() {
+          return _numRemainingEntries > 0;
+        }
+
+        @Override
+        public Entry next() {
+          int key;
+          while ((key = _keyValueHolder[_index]) == 0) {
+            _index += 2;
+          }
+          _entry._rawKey = key - 1;
+          _entry._groupId = _keyValueHolder[_index + 1];
+          _index += 2;
+          _numRemainingEntries--;
+          return _entry;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    }
+
+    /**
+     * Clears the map and trims the map if the size is larger than the {@link 
#MAX_CACHING_MAP_SIZE}.
+     */
+    public void clear() {
+      if (_size == 0) {
+        return;
+      }
+      if (_size <= MAX_CACHING_MAP_SIZE) {
+        Arrays.fill(_keyValueHolder, 0);
+      } else {
+        _capacity = 1 << 10;

Review comment:
       (nit) this is same as the initialization in the constructor. consider 
creating an init() function and moving this code in that function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to