This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a3ebd0c  Use thread local for groupby raw key holders (#5419)
a3ebd0c is described below

commit a3ebd0c5981d692185d7c79a2ff1e8d682b5f069
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Wed May 20 20:50:58 2020 -0700

    Use thread local for groupby raw key holders (#5419)
    
    * Use thread local for groupby raw key holders
    
    * Adding more optimization for map initial size and discard size
---
 .../groupby/DefaultGroupByExecutor.java            |  6 +-
 .../groupby/DictionaryBasedGroupKeyGenerator.java  | 66 ++++++++++++++++++++--
 .../DictionaryBasedGroupKeyGeneratorTest.java      | 28 ++++++---
 3 files changed, 84 insertions(+), 16 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
index a2d561a..f73704d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby;
 
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.core.common.BlockValSet;
@@ -45,6 +46,9 @@ public class DefaultGroupByExecutor implements 
GroupByExecutor {
   private static final ThreadLocal<int[][]> THREAD_LOCAL_MV_GROUP_KEYS =
       ThreadLocal.withInitial(() -> new 
int[DocIdSetPlanNode.MAX_DOC_PER_CALL][]);
 
+  // Thread local (reusable) hashMap as holder for group keys
+  private static final ThreadLocal<Map> 
THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS =
+      ThreadLocal.withInitial(() -> new HashMap());
   protected final AggregationFunction[] _aggregationFunctions;
   protected final GroupKeyGenerator _groupKeyGenerator;
   protected final GroupByResultHolder[] _groupByResultHolders;
@@ -86,7 +90,7 @@ public class DefaultGroupByExecutor implements 
GroupByExecutor {
       }
     } else {
       _groupKeyGenerator = new 
DictionaryBasedGroupKeyGenerator(transformOperator, groupByExpressions, 
numGroupsLimit,
-          maxInitialResultHolderCapacity);
+          maxInitialResultHolderCapacity, 
THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     }
 
     // Initialize result holders
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
index c163ffd..4ce030a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
@@ -26,6 +26,7 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
@@ -61,7 +62,8 @@ import org.apache.pinot.core.segment.index.readers.Dictionary;
  * bounded by the number of groups limit (globalGroupIdUpperBound is always 
smaller or equal to numGroupsLimit).
  */
 public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
-  private final static int DEFAULT_HASH_MAP_INITIAL_SIZE = 16;
+  private final static int INITIAL_MAP_SIZE = 256;
+  private final static int MAX_CACHING_MAP_SIZE = 1048576;
   private final TransformExpressionTree[] _groupByExpressions;
   private final int _numGroupByExpressions;
   private final int[] _cardinalities;
@@ -78,7 +80,8 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
   private final RawKeyHolder _rawKeyHolder;
 
   public DictionaryBasedGroupKeyGenerator(TransformOperator transformOperator,
-      TransformExpressionTree[] groupByExpressions, int numGroupsLimit, int 
arrayBasedThreshold) {
+      TransformExpressionTree[] groupByExpressions, int numGroupsLimit, int 
arrayBasedThreshold,
+      Map mapBasedRawKeyHolders) {
     assert numGroupsLimit >= arrayBasedThreshold;
 
     _groupByExpressions = groupByExpressions;
@@ -107,18 +110,32 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
 
       _isSingleValueColumn[i] = 
transformOperator.getResultMetadata(groupByExpression).isSingleValue();
     }
-
     if (longOverflow) {
       _globalGroupIdUpperBound = numGroupsLimit;
-      _rawKeyHolder = new ArrayMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE);
+      Object mapInternal = 
mapBasedRawKeyHolders.computeIfAbsent(ArrayMapBasedHolder.class.getName(),
+          o -> new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+      _rawKeyHolder = new ArrayMapBasedHolder(mapInternal);
+      if (((Object2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE) {
+        mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new 
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+      }
     } else {
       if (cardinalityProduct > Integer.MAX_VALUE) {
         _globalGroupIdUpperBound = numGroupsLimit;
-        _rawKeyHolder = new LongMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE);
+        Object mapInternal = 
mapBasedRawKeyHolders.computeIfAbsent(LongMapBasedHolder.class.getName(),
+            o -> new LongMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+        _rawKeyHolder = new LongMapBasedHolder(mapInternal);
+        if (((Long2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE) {
+          mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new 
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+        }
       } else {
         _globalGroupIdUpperBound = Math.min((int) cardinalityProduct, 
numGroupsLimit);
         if (cardinalityProduct > arrayBasedThreshold) {
-          _rawKeyHolder = new IntMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE);
+          Object mapInternal = 
mapBasedRawKeyHolders.computeIfAbsent(IntMapBasedHolder.class.getName(),
+              o -> new IntMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+          _rawKeyHolder = new IntMapBasedHolder(mapInternal);
+          if (((Int2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE) 
{
+            mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new 
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+          }
         } else {
           _rawKeyHolder = new ArrayBasedHolder();
         }
@@ -191,6 +208,8 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
      * @return Upper bound of group id inside the holder
      */
     int getGroupIdUpperBound();
+
+    Object getInternal();
   }
 
   private class ArrayBasedHolder implements RawKeyHolder {
@@ -225,6 +244,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
       return _globalGroupIdUpperBound;
     }
 
+    @Override
+    public Object getInternal() {
+      return _flags;
+    }
+
     @Nonnull
     @Override
     public Iterator<GroupKey> iterator() {
@@ -269,6 +293,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
       _rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID);
     }
 
+    public IntMapBasedHolder(Object hashMap) {
+      _rawKeyToGroupIdMap = (Int2IntOpenHashMap) hashMap;
+      _rawKeyToGroupIdMap.clear();
+    }
+
     @Override
     public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
@@ -308,6 +337,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
       return _numGroups;
     }
 
+    @Override
+    public Object getInternal() {
+      return _rawKeyToGroupIdMap;
+    }
+
     @Nonnull
     @Override
     public Iterator<GroupKey> iterator() {
@@ -448,6 +482,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
       _rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID);
     }
 
+    public LongMapBasedHolder(Object rawKeyToGroupIdMap) {
+      _rawKeyToGroupIdMap = (Long2IntOpenHashMap) rawKeyToGroupIdMap;
+      _rawKeyToGroupIdMap.clear();
+    }
+
     @Override
     public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
@@ -488,6 +527,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
       return _numGroups;
     }
 
+    @Override
+    public Object getInternal() {
+      return _rawKeyToGroupIdMap;
+    }
+
     @Nonnull
     @Override
     public Iterator<GroupKey> iterator() {
@@ -619,6 +663,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
       _rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID);
     }
 
+    public ArrayMapBasedHolder(Object rawKeyToGroupIdMap) {
+      _rawKeyToGroupIdMap = (Object2IntOpenHashMap<IntArray>) 
rawKeyToGroupIdMap;
+      _rawKeyToGroupIdMap.clear();
+    }
+
     @Override
     public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
@@ -659,6 +708,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
       return _numGroups;
     }
 
+    @Override
+    public Object getInternal() {
+      return _rawKeyToGroupIdMap;
+    }
+
     @Nonnull
     @Override
     public Iterator<GroupKey> iterator() {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
index 2223c32..4fffcef 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
@@ -74,6 +74,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
   private static final String FILTER_COLUMN = "docId";
   private static final String[] SV_COLUMNS = {"s1", "s2", "s3", "s4", "s5", 
"s6", "s7", "s8", "s9", "s10"};
   private static final String[] MV_COLUMNS = {"m1", "m2"};
+  private static final ThreadLocal<Map> 
THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS =
+      ThreadLocal.withInitial(() -> new HashMap());
 
   private final long _randomSeed = System.currentTimeMillis();
   private final Random _random = new Random(_randomSeed);
@@ -168,7 +170,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns),
             InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-            
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+            InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), 
UNIQUE_ROWS, _errorMessage);
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
UNIQUE_ROWS, _errorMessage);
 
@@ -188,7 +191,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns),
             InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-            
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+            InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
         InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
0, _errorMessage);
@@ -209,7 +213,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns),
             InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-            
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+            InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
         InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
0, _errorMessage);
@@ -230,7 +235,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns),
             InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-            
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+            InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
         InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
0, _errorMessage);
@@ -265,7 +271,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns),
             InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-            
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+            InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     int groupKeyUpperBound = 
dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound();
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
groupKeyUpperBound, _errorMessage);
 
@@ -286,7 +293,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns),
             InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-            
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+            InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
         InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
0, _errorMessage);
@@ -308,7 +316,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns),
             InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-            
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+            InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
         InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
0, _errorMessage);
@@ -330,7 +339,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns),
             InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-            
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+            InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
         InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
0, _errorMessage);
@@ -350,7 +360,7 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     // NOTE: arrayBasedThreshold must be smaller or equal to numGroupsLimit
     DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
         new DictionaryBasedGroupKeyGenerator(_transformOperator, 
getExpressions(groupByColumns), numGroupsLimit,
-            numGroupsLimit);
+            numGroupsLimit, 
THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
     
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), 
numGroupsLimit, _errorMessage);
     
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 
0, _errorMessage);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to