This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new a3ebd0c Use thread local for groupby raw key holders (#5419) a3ebd0c is described below commit a3ebd0c5981d692185d7c79a2ff1e8d682b5f069 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Wed May 20 20:50:58 2020 -0700 Use thread local for groupby raw key holders (#5419) * Use thread local for groupby raw key holders * Adding more optimization for map initial size and discard size --- .../groupby/DefaultGroupByExecutor.java | 6 +- .../groupby/DictionaryBasedGroupKeyGenerator.java | 66 ++++++++++++++++++++-- .../DictionaryBasedGroupKeyGeneratorTest.java | 28 ++++++--- 3 files changed, 84 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java index a2d561a..f73704d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.query.aggregation.groupby; +import java.util.HashMap; import java.util.Map; import org.apache.pinot.common.request.transform.TransformExpressionTree; import org.apache.pinot.core.common.BlockValSet; @@ -45,6 +46,9 @@ public class DefaultGroupByExecutor implements GroupByExecutor { private static final ThreadLocal<int[][]> THREAD_LOCAL_MV_GROUP_KEYS = ThreadLocal.withInitial(() -> new int[DocIdSetPlanNode.MAX_DOC_PER_CALL][]); + // Thread local (reusable) hashMap as holder for group keys + private static final ThreadLocal<Map> THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS = + ThreadLocal.withInitial(() -> new HashMap()); protected final AggregationFunction[] _aggregationFunctions; protected final GroupKeyGenerator _groupKeyGenerator; protected final GroupByResultHolder[] _groupByResultHolders; @@ -86,7 +90,7 @@ public class DefaultGroupByExecutor implements GroupByExecutor { } } else { _groupKeyGenerator = new DictionaryBasedGroupKeyGenerator(transformOperator, groupByExpressions, numGroupsLimit, - maxInitialResultHolderCapacity); + maxInitialResultHolderCapacity, THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); } // Initialize result holders diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java index c163ffd..4ce030a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java @@ -26,6 +26,7 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.util.Iterator; +import java.util.Map; import java.util.NoSuchElementException; import javax.annotation.Nonnull; import org.apache.pinot.common.request.transform.TransformExpressionTree; @@ -61,7 +62,8 @@ import org.apache.pinot.core.segment.index.readers.Dictionary; * bounded by the number of groups limit (globalGroupIdUpperBound is always smaller or equal to numGroupsLimit). */ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { - private final static int DEFAULT_HASH_MAP_INITIAL_SIZE = 16; + private final static int INITIAL_MAP_SIZE = 256; + private final static int MAX_CACHING_MAP_SIZE = 1048576; private final TransformExpressionTree[] _groupByExpressions; private final int _numGroupByExpressions; private final int[] _cardinalities; @@ -78,7 +80,8 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { private final RawKeyHolder _rawKeyHolder; public DictionaryBasedGroupKeyGenerator(TransformOperator transformOperator, - TransformExpressionTree[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold) { + TransformExpressionTree[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold, + Map mapBasedRawKeyHolders) { assert numGroupsLimit >= arrayBasedThreshold; _groupByExpressions = groupByExpressions; @@ -107,18 +110,32 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { _isSingleValueColumn[i] = transformOperator.getResultMetadata(groupByExpression).isSingleValue(); } - if (longOverflow) { _globalGroupIdUpperBound = numGroupsLimit; - _rawKeyHolder = new ArrayMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE); + Object mapInternal = mapBasedRawKeyHolders.computeIfAbsent(ArrayMapBasedHolder.class.getName(), + o -> new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); + _rawKeyHolder = new ArrayMapBasedHolder(mapInternal); + if (((Object2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE) { + mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); + } } else { if (cardinalityProduct > Integer.MAX_VALUE) { _globalGroupIdUpperBound = numGroupsLimit; - _rawKeyHolder = new LongMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE); + Object mapInternal = mapBasedRawKeyHolders.computeIfAbsent(LongMapBasedHolder.class.getName(), + o -> new LongMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); + _rawKeyHolder = new LongMapBasedHolder(mapInternal); + if (((Long2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE) { + mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); + } } else { _globalGroupIdUpperBound = Math.min((int) cardinalityProduct, numGroupsLimit); if (cardinalityProduct > arrayBasedThreshold) { - _rawKeyHolder = new IntMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE); + Object mapInternal = mapBasedRawKeyHolders.computeIfAbsent(IntMapBasedHolder.class.getName(), + o -> new IntMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); + _rawKeyHolder = new IntMapBasedHolder(mapInternal); + if (((Int2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE) { + mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal()); + } } else { _rawKeyHolder = new ArrayBasedHolder(); } @@ -191,6 +208,8 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { * @return Upper bound of group id inside the holder */ int getGroupIdUpperBound(); + + Object getInternal(); } private class ArrayBasedHolder implements RawKeyHolder { @@ -225,6 +244,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { return _globalGroupIdUpperBound; } + @Override + public Object getInternal() { + return _flags; + } + @Nonnull @Override public Iterator<GroupKey> iterator() { @@ -269,6 +293,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { _rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID); } + public IntMapBasedHolder(Object hashMap) { + _rawKeyToGroupIdMap = (Int2IntOpenHashMap) hashMap; + _rawKeyToGroupIdMap.clear(); + } + @Override public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) { for (int i = 0; i < numDocs; i++) { @@ -308,6 +337,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { return _numGroups; } + @Override + public Object getInternal() { + return _rawKeyToGroupIdMap; + } + @Nonnull @Override public Iterator<GroupKey> iterator() { @@ -448,6 +482,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { _rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID); } + public LongMapBasedHolder(Object rawKeyToGroupIdMap) { + _rawKeyToGroupIdMap = (Long2IntOpenHashMap) rawKeyToGroupIdMap; + _rawKeyToGroupIdMap.clear(); + } + @Override public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) { for (int i = 0; i < numDocs; i++) { @@ -488,6 +527,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { return _numGroups; } + @Override + public Object getInternal() { + return _rawKeyToGroupIdMap; + } + @Nonnull @Override public Iterator<GroupKey> iterator() { @@ -619,6 +663,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { _rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID); } + public ArrayMapBasedHolder(Object rawKeyToGroupIdMap) { + _rawKeyToGroupIdMap = (Object2IntOpenHashMap<IntArray>) rawKeyToGroupIdMap; + _rawKeyToGroupIdMap.clear(); + } + @Override public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) { for (int i = 0; i < numDocs; i++) { @@ -659,6 +708,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { return _numGroups; } + @Override + public Object getInternal() { + return _rawKeyToGroupIdMap; + } + @Nonnull @Override public Iterator<GroupKey> iterator() { diff --git a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java index 2223c32..4fffcef 100644 --- a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java @@ -74,6 +74,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { private static final String FILTER_COLUMN = "docId"; private static final String[] SV_COLUMNS = {"s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10"}; private static final String[] MV_COLUMNS = {"m1", "m2"}; + private static final ThreadLocal<Map> THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS = + ThreadLocal.withInitial(() -> new HashMap()); private final long _randomSeed = System.currentTimeMillis(); private final Random _random = new Random(_randomSeed); @@ -168,7 +170,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, - InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), UNIQUE_ROWS, _errorMessage); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), UNIQUE_ROWS, _errorMessage); @@ -188,7 +191,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, - InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage); @@ -209,7 +213,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, - InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage); @@ -230,7 +235,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, - InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage); @@ -265,7 +271,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, - InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); int groupKeyUpperBound = dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), groupKeyUpperBound, _errorMessage); @@ -286,7 +293,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, - InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage); @@ -308,7 +316,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, - InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage); @@ -330,7 +339,8 @@ public class DictionaryBasedGroupKeyGeneratorTest { DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, - InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage); @@ -350,7 +360,7 @@ public class DictionaryBasedGroupKeyGeneratorTest { // NOTE: arrayBasedThreshold must be smaller or equal to numGroupsLimit DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator = new DictionaryBasedGroupKeyGenerator(_transformOperator, getExpressions(groupByColumns), numGroupsLimit, - numGroupsLimit); + numGroupsLimit, THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get()); assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), numGroupsLimit, _errorMessage); assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org