This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b78b76fc4f Warn about numGroups above threshold (#15278) b78b76fc4f is described below commit b78b76fc4fced65da1f06a81a6da807983efe864 Author: Alberto Bastos <alberto.var...@startree.ai> AuthorDate: Thu Mar 27 10:42:22 2025 +0100 Warn about numGroups above threshold (#15278) --- .../pinot/common/utils/config/QueryOptionsUtils.java | 4 ++++ .../core/operator/query/FilteredGroupByOperator.java | 8 ++++++++ .../apache/pinot/core/operator/query/GroupByOperator.java | 8 ++++++++ .../pinot/core/plan/maker/InstancePlanMakerImplV2.java | 6 ++++++ .../pinot/core/query/request/context/QueryContext.java | 10 ++++++++++ .../java/org/apache/pinot/query/runtime/QueryRunner.java | 11 ++++++++++- .../pinot/query/runtime/operator/AggregateOperator.java | 4 ++++ .../query/runtime/operator/MultistageGroupByExecutor.java | 15 +++++++++++++++ .../query/runtime/operator/AggregateOperatorTest.java | 12 ++++++++---- .../java/org/apache/pinot/spi/utils/CommonConstants.java | 6 ++++++ 10 files changed, 79 insertions(+), 5 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index e37c980f4e..846bbf5d0c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -298,6 +298,10 @@ public class QueryOptionsUtils { return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_LIMIT, maxNumGroupLimit); } + public static Integer getNumGroupsWarningLimit(Map<String, String> queryOptions) { + String numGroupsWarningLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT); + return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT, numGroupsWarningLimit); + } @Nullable public static Integer getMaxInitialResultHolderCapacity(Map<String, String> queryOptions) { String maxInitialResultHolderCapacity = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index 5c82827cb4..6272a40422 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -48,6 +48,8 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -56,6 +58,7 @@ import org.apache.pinot.spi.trace.Tracing; */ @SuppressWarnings("rawtypes") public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(FilteredGroupByOperator.class); private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED"; private final QueryContext _queryContext; @@ -167,6 +170,11 @@ public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> { boolean numGroupsLimitReached = groupKeyGenerator.getNumKeys() >= _queryContext.getNumGroupsLimit(); Tracing.activeRecording().setNumGroups(_queryContext.getNumGroupsLimit(), groupKeyGenerator.getNumKeys()); + if (groupKeyGenerator.getNumKeys() >= _queryContext.getNumGroupsWarningLimit()) { + LOGGER.warn("numGroups reached warning limit: {} (actual: {})", + _queryContext.getNumGroupsWarningLimit(), groupKeyGenerator.getNumKeys()); + } + // Trim the groups when iff: // - Query has ORDER BY clause // - Segment group trim is enabled diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java index 6e27c6b365..33d5e4157c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java @@ -43,6 +43,8 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -50,6 +52,7 @@ import org.apache.pinot.spi.trace.Tracing; */ @SuppressWarnings("rawtypes") public class GroupByOperator extends BaseOperator<GroupByResultsBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GroupByOperator.class); private static final String EXPLAIN_NAME = "GROUP_BY"; private final QueryContext _queryContext; @@ -116,6 +119,11 @@ public class GroupByOperator extends BaseOperator<GroupByResultsBlock> { boolean numGroupsLimitReached = groupByExecutor.getNumGroups() >= _queryContext.getNumGroupsLimit(); Tracing.activeRecording().setNumGroups(_queryContext.getNumGroupsLimit(), groupByExecutor.getNumGroups()); + if (groupByExecutor.getNumGroups() >= _queryContext.getNumGroupsWarningLimit()) { + LOGGER.warn("numGroups reached warning limit: {} (actual: {})", + _queryContext.getNumGroupsWarningLimit(), groupByExecutor.getNumGroups()); + } + // Trim the groups when iff: // - Query has ORDER BY clause // - Segment group trim is enabled diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 562d1936c2..511815fbb2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -110,6 +110,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker { private int _minInitialIndexedTableCapacity = Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY; // Limit on number of groups stored for each segment, beyond which no new group will be created private int _numGroupsLimit = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT; + // Warning limit on number of groups stored for each segment + private int _numGroupsWarningLimit = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT; // Used for SQL GROUP BY (server combine) private int _minSegmentGroupTrimSize = Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE; private int _minServerGroupTrimSize = Server.DEFAULT_QUERY_EXECUTOR_MIN_SERVER_GROUP_TRIM_SIZE; @@ -131,6 +133,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker { Preconditions.checkState(_minInitialIndexedTableCapacity <= _numGroupsLimit, "Invalid configuration: minInitialIndexedTableCapacity: %d must be smaller or equal to numGroupsLimit: %d", _minInitialIndexedTableCapacity, _numGroupsLimit); + _numGroupsWarningLimit = queryExecutorConfig.getProperty(Server.NUM_GROUPS_WARN_LIMIT, + Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT); _minSegmentGroupTrimSize = queryExecutorConfig.getProperty(Server.MIN_SEGMENT_GROUP_TRIM_SIZE, Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE); _minServerGroupTrimSize = queryExecutorConfig.getProperty(Server.MIN_SERVER_GROUP_TRIM_SIZE, @@ -256,6 +260,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker { } else { queryContext.setNumGroupsLimit(_numGroupsLimit); } + // Set numGroupsWarningThreshold + queryContext.setNumGroupsWarningLimit(_numGroupsWarningLimit); // Set minSegmentGroupTrimSize Integer minSegmentGroupTrimSizeFromQuery = QueryOptionsUtils.getMinSegmentGroupTrimSize(queryOptions); if (minSegmentGroupTrimSizeFromQuery != null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index 9f35b1c2ef..b1144e6044 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -117,6 +117,8 @@ public class QueryContext { private int _minInitialIndexedTableCapacity = Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY; // Limit of number of groups stored in each segment private int _numGroupsLimit = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT; + // Warning threshold of number of groups stored in each segment + private int _numGroupsWarningLimit = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT; // Minimum number of groups to keep per segment when trimming groups for SQL GROUP BY private int _minSegmentGroupTrimSize = Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE; // Minimum number of groups to keep across segments when trimming groups for SQL GROUP BY @@ -384,6 +386,14 @@ public class QueryContext { _numGroupsLimit = numGroupsLimit; } + public int getNumGroupsWarningLimit() { + return _numGroupsWarningLimit; + } + + public void setNumGroupsWarningLimit(int numGroupsWarningLimit) { + _numGroupsWarningLimit = numGroupsWarningLimit; + } + public int getMinSegmentGroupTrimSize() { return _minSegmentGroupTrimSize; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 7320061c5e..fb5f340961 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -117,6 +117,8 @@ public class QueryRunner { @Nullable private Integer _numGroupsLimit; @Nullable + private Integer _numGroupsWarningLimit; + @Nullable private Integer _mseMinGroupTrimSize; @Nullable @@ -156,6 +158,9 @@ public class QueryRunner { String numGroupsLimitStr = config.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT); _numGroupsLimit = numGroupsLimitStr != null ? Integer.parseInt(numGroupsLimitStr) : null; + String numGroupsWarnLimitStr = config.getProperty(Server.CONFIG_OF_NUM_GROUPS_WARN_LIMIT); + _numGroupsWarningLimit = numGroupsWarnLimitStr != null ? Integer.parseInt(numGroupsWarnLimitStr) : null; + String mseMinGroupTrimSizeStr = config.getProperty(Server.CONFIG_OF_MSE_MIN_GROUP_TRIM_SIZE); _mseMinGroupTrimSize = mseMinGroupTrimSizeStr != null ? Integer.parseInt(mseMinGroupTrimSizeStr) : null; @@ -377,7 +382,11 @@ public class QueryRunner { opChainMetadata.putAll(requestMetadata); // 2. put all stageMetadata.customProperties. opChainMetadata.putAll(customProperties); - // 3. add all overrides from config if anything is still empty. + // 3. put some config not allowed through query options but propagated that way + if (_numGroupsWarningLimit != null) { + opChainMetadata.put(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT, Integer.toString(_numGroupsWarningLimit)); + } + // 4. add all overrides from config if anything is still empty. Integer numGroupsLimit = QueryOptionsUtils.getNumGroupsLimit(opChainMetadata); if (numGroupsLimit == null) { numGroupsLimit = _numGroupsLimit; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index 7d1c7a7e31..64d84f2720 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -236,6 +236,10 @@ public class AggregateOperator extends MultiStageOperator { _input.earlyTerminate(); } } + if (_groupByExecutor.getNumGroups() >= _groupByExecutor.getNumGroupsWarningLimit()) { + LOGGER.warn("numGroups reached warning limit: {} (actual: {})", + _groupByExecutor.getNumGroupsWarningLimit(), _groupByExecutor.getNumGroups()); + } return dataBlock; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java index d1d5029496..1cd26d2574 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java @@ -61,6 +61,7 @@ public class MultistageGroupByExecutor { private final boolean _leafReturnFinalResult; private final DataSchema _resultSchema; private final int _numGroupsLimit; + private final int _numGroupsWarningLimit; private final boolean _filteredAggregationsSkipEmptyGroups; // Group By Result holders for each mode @@ -85,6 +86,7 @@ public class MultistageGroupByExecutor { int maxInitialResultHolderCapacity = getResolvedMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); _numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint); + _numGroupsWarningLimit = getNumGroupsWarningLimit(opChainMetadata); // By default, we compute all groups for SQL compliant results. However, we allow overriding this behavior via // query option for improved performance. @@ -122,6 +124,11 @@ public class MultistageGroupByExecutor { return numGroupsLimit != null ? numGroupsLimit : Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT; } + private int getNumGroupsWarningLimit(Map<String, String> opChainMetadata) { + Integer numGroupsWarningLimit = QueryOptionsUtils.getNumGroupsWarningLimit(opChainMetadata); + return numGroupsWarningLimit != null ? numGroupsWarningLimit : Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT; + } + private int getResolvedMaxInitialResultHolderCapacity(Map<String, String> opChainMetadata, @Nullable PlanNode.NodeHint nodeHint) { Integer mseMaxInitialResultHolderCapacity = getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); @@ -166,6 +173,10 @@ public class MultistageGroupByExecutor { return _numGroupsLimit; } + public int getNumGroupsWarningLimit() { + return _numGroupsWarningLimit; + } + /** * Performs group-by aggregation for the data in the block. */ @@ -277,6 +288,10 @@ public class MultistageGroupByExecutor { } } + public int getNumGroups() { + return _groupIdGenerator.getNumGroups(); + } + public boolean isNumGroupsLimitReached() { return _groupIdGenerator.getNumGroups() == _numGroupsLimit; } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java index 23ccdc2ce3..9bea990d7e 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.runtime.operator; +import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -255,7 +256,9 @@ public class AggregateOperatorTest { .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{3, 3.0})) .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE}); - AggregateOperator operator = getOperator(resultSchema, aggCalls, filterArgs, groupKeys, nodeHint); + Map<String, String> opChainMetadata = new HashMap<>(); + opChainMetadata.put(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT, "1"); + AggregateOperator operator = getOperator(resultSchema, aggCalls, filterArgs, groupKeys, nodeHint, opChainMetadata); // When: TransferableBlock block1 = operator.nextBlock(); @@ -315,14 +318,15 @@ public class AggregateOperatorTest { } private AggregateOperator getOperator(DataSchema resultSchema, List<RexExpression.FunctionCall> aggCalls, - List<Integer> filterArgs, List<Integer> groupKeys, PlanNode.NodeHint nodeHint) { - return new AggregateOperator(OperatorTestUtil.getTracingContext(), _input, + List<Integer> filterArgs, List<Integer> groupKeys, PlanNode.NodeHint nodeHint, + Map<String, String> opChainMetadata) { + return new AggregateOperator(OperatorTestUtil.getContext(opChainMetadata), _input, new AggregateNode(-1, resultSchema, nodeHint, List.of(), aggCalls, filterArgs, groupKeys, AggType.DIRECT, false, null, 0)); } private AggregateOperator getOperator(DataSchema resultSchema, List<RexExpression.FunctionCall> aggCalls, List<Integer> filterArgs, List<Integer> groupKeys) { - return getOperator(resultSchema, aggCalls, filterArgs, groupKeys, PlanNode.NodeHint.EMPTY); + return getOperator(resultSchema, aggCalls, filterArgs, groupKeys, PlanNode.NodeHint.EMPTY, Map.of()); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index bc9042738b..554dfc9037 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -542,6 +542,8 @@ public class CommonConstants { public static final String ERROR_ON_NUM_GROUPS_LIMIT = "errorOnNumGroupsLimit"; public static final String NUM_GROUPS_LIMIT = "numGroupsLimit"; + // Not actually accepted as Query Option but faked as one during MSE + public static final String NUM_GROUPS_WARNING_LIMIT = "numGroupsWarningLimit"; public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "maxInitialResultHolderCapacity"; public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY = "minInitialIndexedTableCapacity"; public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "mseMaxInitialResultHolderCapacity"; @@ -843,6 +845,10 @@ public class CommonConstants { public static final String CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT = QUERY_EXECUTOR_CONFIG_PREFIX + "." + NUM_GROUPS_LIMIT; public static final int DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT = 100_000; + public static final String NUM_GROUPS_WARN_LIMIT = "num.groups.warn.limit"; + public static final String CONFIG_OF_NUM_GROUPS_WARN_LIMIT = + QUERY_EXECUTOR_CONFIG_PREFIX + "." + NUM_GROUPS_WARN_LIMIT; + public static final int DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT = 150_000; public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "max.init.group.holder.capacity"; public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY = QUERY_EXECUTOR_CONFIG_PREFIX + "." + MAX_INITIAL_RESULT_HOLDER_CAPACITY; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org