This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 465c811675 Introducing MSE result holder config to minimize rehashing for high cardinality group by (#14981) 465c811675 is described below commit 465c811675fedd8fd0b9834e3309887cc23c8409 Author: Shaurya Chaturvedi <shauryach...@users.noreply.github.com> AuthorDate: Thu Feb 6 09:05:12 2025 -0800 Introducing MSE result holder config to minimize rehashing for high cardinality group by (#14981) --- .../common/utils/config/QueryOptionsUtils.java | 6 +++++ .../pinot/calcite/rel/hint/PinotHintOptions.java | 1 + .../apache/pinot/query/runtime/QueryRunner.java | 17 +++++++++++++ .../operator/MultistageGroupByExecutor.java | 29 ++++++++++++++++++++-- .../operator/groupby/GroupIdGeneratorFactory.java | 18 ++++++++------ .../groupby/MultiKeysGroupIdGenerator.java | 5 ++-- .../groupby/OneDoubleKeyGroupIdGenerator.java | 4 +-- .../groupby/OneFloatKeyGroupIdGenerator.java | 4 +-- .../groupby/OneIntKeyGroupIdGenerator.java | 4 +-- .../groupby/OneLongKeyGroupIdGenerator.java | 4 +-- .../groupby/OneObjectKeyGroupIdGenerator.java | 4 +-- .../operator/groupby/TwoKeysGroupIdGenerator.java | 5 ++-- .../apache/pinot/spi/utils/CommonConstants.java | 3 +++ 13 files changed, 80 insertions(+), 24 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 5e8ba86643..74501c5458 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -308,6 +308,12 @@ public class QueryOptionsUtils { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY)); } + @Nullable + public static Integer getMSEMaxInitialResultHolderCapacity(Map<String, String> queryOptions) { + String maxInitialCapacity = queryOptions.get(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + return checkedParseIntPositive(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitialCapacity); + } + @Nullable public static Integer getMinInitialIndexedTableCapacity(Map<String, String> queryOptions) { String minInitialIndexedTableCapacity = queryOptions.get(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java index 5a711ed669..f9b3f61cf1 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java @@ -62,6 +62,7 @@ public class PinotHintOptions { public static final String GROUP_TRIM_SIZE = "group_trim_size"; public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "max_initial_result_holder_capacity"; + public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "mse_max_initial_result_holder_capacity"; } public static class WindowHintOptions { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 876306352b..754f605f4c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -115,6 +115,8 @@ public class QueryRunner { private Integer _maxInitialResultHolderCapacity; @Nullable private Integer _minInitialIndexedTableCapacity; + @Nullable + private Integer _mseMaxInitialResultHolderCapacity; // Join overflow settings @Nullable @@ -158,6 +160,12 @@ public class QueryRunner { _minInitialIndexedTableCapacity = minInitialIndexedTableCapacityStr != null ? Integer.parseInt(minInitialIndexedTableCapacityStr) : null; + + String mseMaxInitialGroupHolderCapacity = + config.getProperty(CommonConstants.Server.CONFIG_OF_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + _mseMaxInitialResultHolderCapacity = + mseMaxInitialGroupHolderCapacity != null ? Integer.parseInt(mseMaxInitialGroupHolderCapacity) : null; + String maxRowsInJoinStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN); _maxRowsInJoin = maxRowsInJoinStr != null ? Integer.parseInt(maxRowsInJoinStr) : null; @@ -377,6 +385,15 @@ public class QueryRunner { Integer.toString(minInitialIndexedTableCapacity)); } + Integer mseMaxInitialResultHolderCapacity = QueryOptionsUtils.getMSEMaxInitialResultHolderCapacity(opChainMetadata); + if (mseMaxInitialResultHolderCapacity == null) { + mseMaxInitialResultHolderCapacity = _mseMaxInitialResultHolderCapacity; + } + if (mseMaxInitialResultHolderCapacity != null) { + opChainMetadata.put(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + Integer.toString(mseMaxInitialResultHolderCapacity)); + } + Integer maxRowsInJoin = QueryOptionsUtils.getMaxRowsInJoin(opChainMetadata); if (maxRowsInJoin == null) { maxRowsInJoin = _maxRowsInJoin; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java index e37798df08..4fa3619c51 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java @@ -87,7 +87,9 @@ public class MultistageGroupByExecutor { _aggType = aggType; _leafReturnFinalResult = leafReturnFinalResult; _resultSchema = resultSchema; - int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); + + int maxInitialResultHolderCapacity = getResolvedMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); + _numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint); // By default, we compute all groups for SQL compliant results. However, we allow overriding this behavior via @@ -109,7 +111,7 @@ public class MultistageGroupByExecutor { _groupIdGenerator = GroupIdGeneratorFactory.getGroupIdGenerator(_resultSchema.getStoredColumnDataTypes(), groupKeyIds.length, - _numGroupsLimit); + _numGroupsLimit, maxInitialResultHolderCapacity); } private int getNumGroupsLimit(Map<String, String> opChainMetadata, @Nullable PlanNode.NodeHint nodeHint) { @@ -126,6 +128,13 @@ public class MultistageGroupByExecutor { return numGroupsLimit != null ? numGroupsLimit : InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT; } + private int getResolvedMaxInitialResultHolderCapacity(Map<String, String> opChainMetadata, + @Nullable PlanNode.NodeHint nodeHint) { + Integer mseMaxInitialResultHolderCapacity = getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); + return (mseMaxInitialResultHolderCapacity != null) ? mseMaxInitialResultHolderCapacity + : getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); + } + private int getMaxInitialResultHolderCapacity(Map<String, String> opChainMetadata, @Nullable PlanNode.NodeHint nodeHint) { if (nodeHint != null) { @@ -143,6 +152,22 @@ public class MultistageGroupByExecutor { : InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY; } + private Integer getMSEMaxInitialResultHolderCapacity(Map<String, String> opChainMetadata, + @Nullable PlanNode.NodeHint nodeHint) { + if (nodeHint != null) { + Map<String, String> aggregateOptions = nodeHint.getHintOptions().get(PinotHintOptions.AGGREGATE_HINT_OPTIONS); + if (aggregateOptions != null) { + String maxInitialMSEResultHolderCapacityStr = + aggregateOptions.get(PinotHintOptions.AggregateOptions.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + if (maxInitialMSEResultHolderCapacityStr != null) { + return Integer.parseInt(maxInitialMSEResultHolderCapacityStr); + } + } + } + // Don't return default value since null value means we need to fallback to MaxInitialResultHolderCapacity + return QueryOptionsUtils.getMSEMaxInitialResultHolderCapacity(opChainMetadata); + } + public int getNumGroupsLimit() { return _numGroupsLimit; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java index 16be037f38..a254c37bf8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java @@ -25,24 +25,26 @@ public class GroupIdGeneratorFactory { private GroupIdGeneratorFactory() { } - public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) { + public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, + int numGroupsLimit, int maxInitialResultHolderCapacity) { + int initialCapacity = Math.min(maxInitialResultHolderCapacity, numGroupsLimit); if (numKeyColumns == 1) { switch (keyTypes[0]) { case INT: - return new OneIntKeyGroupIdGenerator(numGroupsLimit); + return new OneIntKeyGroupIdGenerator(numGroupsLimit, initialCapacity); case LONG: - return new OneLongKeyGroupIdGenerator(numGroupsLimit); + return new OneLongKeyGroupIdGenerator(numGroupsLimit, initialCapacity); case FLOAT: - return new OneFloatKeyGroupIdGenerator(numGroupsLimit); + return new OneFloatKeyGroupIdGenerator(numGroupsLimit, initialCapacity); case DOUBLE: - return new OneDoubleKeyGroupIdGenerator(numGroupsLimit); + return new OneDoubleKeyGroupIdGenerator(numGroupsLimit, initialCapacity); default: - return new OneObjectKeyGroupIdGenerator(numGroupsLimit); + return new OneObjectKeyGroupIdGenerator(numGroupsLimit, initialCapacity); } } else if (numKeyColumns == 2) { - return new TwoKeysGroupIdGenerator(keyTypes[0], keyTypes[1], numGroupsLimit); + return new TwoKeysGroupIdGenerator(keyTypes[0], keyTypes[1], numGroupsLimit, initialCapacity); } else { - return new MultiKeysGroupIdGenerator(keyTypes, numKeyColumns, numGroupsLimit); + return new MultiKeysGroupIdGenerator(keyTypes, numKeyColumns, numGroupsLimit, initialCapacity); } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java index 30019746b4..9a31aae3ef 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java @@ -32,8 +32,9 @@ public class MultiKeysGroupIdGenerator implements GroupIdGenerator { private final ValueToIdMap[] _keyToIdMaps; private final int _numGroupsLimit; - public MultiKeysGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) { - _groupIdMap = new Object2IntOpenHashMap<>(); + public MultiKeysGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, + int numGroupsLimit, int initialCapacity) { + _groupIdMap = new Object2IntOpenHashMap<>(initialCapacity); _groupIdMap.defaultReturnValue(INVALID_ID); _keyToIdMaps = new ValueToIdMap[numKeyColumns]; for (int i = 0; i < numKeyColumns; i++) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java index cf3f920d22..34392a2825 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java @@ -31,8 +31,8 @@ public class OneDoubleKeyGroupIdGenerator implements GroupIdGenerator { private int _numGroups = 0; private int _nullGroupId = INVALID_ID; - public OneDoubleKeyGroupIdGenerator(int numGroupsLimit) { - _groupIdMap = new Double2IntOpenHashMap(); + public OneDoubleKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) { + _groupIdMap = new Double2IntOpenHashMap(initialCapacity); _groupIdMap.defaultReturnValue(INVALID_ID); _numGroupsLimit = numGroupsLimit; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java index 5d3005dc6b..675bda6975 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java @@ -30,8 +30,8 @@ public class OneFloatKeyGroupIdGenerator implements GroupIdGenerator { private int _numGroups = 0; private int _nullGroupId = INVALID_ID; - public OneFloatKeyGroupIdGenerator(int numGroupsLimit) { - _groupIdMap = new Float2IntOpenHashMap(); + public OneFloatKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) { + _groupIdMap = new Float2IntOpenHashMap(initialCapacity); _groupIdMap.defaultReturnValue(INVALID_ID); _numGroupsLimit = numGroupsLimit; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java index 77064f8b3e..703f68f0bc 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java @@ -31,8 +31,8 @@ public class OneIntKeyGroupIdGenerator implements GroupIdGenerator { private int _numGroups = 0; private int _nullGroupId = INVALID_ID; - public OneIntKeyGroupIdGenerator(int numGroupsLimit) { - _groupIdMap = new Int2IntOpenHashMap(); + public OneIntKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) { + _groupIdMap = new Int2IntOpenHashMap(initialCapacity); _groupIdMap.defaultReturnValue(INVALID_ID); _numGroupsLimit = numGroupsLimit; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java index 5862df3def..82ae52e9a9 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java @@ -31,8 +31,8 @@ public class OneLongKeyGroupIdGenerator implements GroupIdGenerator { private int _numGroups = 0; private int _nullGroupId = INVALID_ID; - public OneLongKeyGroupIdGenerator(int numGroupsLimit) { - _groupIdMap = new Long2IntOpenHashMap(); + public OneLongKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) { + _groupIdMap = new Long2IntOpenHashMap(initialCapacity); _groupIdMap.defaultReturnValue(INVALID_ID); _numGroupsLimit = numGroupsLimit; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java index e7d7bc3815..be2f82b2c3 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java @@ -28,8 +28,8 @@ public class OneObjectKeyGroupIdGenerator implements GroupIdGenerator { private final Object2IntOpenHashMap<Object> _groupIdMap; private final int _numGroupsLimit; - public OneObjectKeyGroupIdGenerator(int numGroupsLimit) { - _groupIdMap = new Object2IntOpenHashMap<>(); + public OneObjectKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) { + _groupIdMap = new Object2IntOpenHashMap<>(initialCapacity); _groupIdMap.defaultReturnValue(INVALID_ID); _numGroupsLimit = numGroupsLimit; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java index 21e8fcf144..0f31dcb4fb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java @@ -33,8 +33,9 @@ public class TwoKeysGroupIdGenerator implements GroupIdGenerator { private final ValueToIdMap _secondKeyToIdMap; private final int _numGroupsLimit; - public TwoKeysGroupIdGenerator(ColumnDataType firstKeyType, ColumnDataType secondKeyType, int numGroupsLimit) { - _groupIdMap = new Long2IntOpenHashMap(); + public TwoKeysGroupIdGenerator(ColumnDataType firstKeyType, + ColumnDataType secondKeyType, int numGroupsLimit, int initialCapacity) { + _groupIdMap = new Long2IntOpenHashMap(initialCapacity); _groupIdMap.defaultReturnValue(INVALID_ID); _firstKeyToIdMap = ValueToIdMapFactory.get(firstKeyType.toDataType()); _secondKeyToIdMap = ValueToIdMapFactory.get(secondKeyType.toDataType()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 4032042ca7..bdf1cbed0f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -509,6 +509,7 @@ public class CommonConstants { public static final String NUM_GROUPS_LIMIT = "numGroupsLimit"; public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "maxInitialResultHolderCapacity"; public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY = "minInitialIndexedTableCapacity"; + public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "mseMaxInitialResultHolderCapacity"; public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold"; public static final String STAGE_PARALLELISM = "stageParallelism"; @@ -764,6 +765,8 @@ public class CommonConstants { "pinot.server.query.executor.group.trim.size"; public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "pinot.server.query.executor.max.init.group.holder.capacity"; + public static final String CONFIG_OF_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = + "pinot.server.mse.max.init.group.holder.capacity"; public static final String CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY = "pinot.server.query.executor.min.init.indexed.table.capacity"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org