This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1a244449bc fix numGroups metric and add metric for warnings (#15280) 1a244449bc is described below commit 1a244449bc8fdd3966701c14d67501f0e01aba65 Author: Alberto Bastos <alberto.var...@startree.ai> AuthorDate: Fri Mar 28 10:30:49 2025 +0100 fix numGroups metric and add metric for warnings (#15280) --- .../apache/pinot/common/metrics/ServerMeter.java | 12 ++++++++---- .../operator/query/FilteredGroupByOperator.java | 6 ++++++ .../pinot/core/operator/query/GroupByOperator.java | 6 ++++++ .../tests/JmxMetricsIntegrationTest.java | 22 ++++++++++++++++++++++ .../apache/pinot/query/runtime/QueryRunner.java | 2 +- .../query/runtime/operator/MultiStageOperator.java | 14 ++++---------- .../apache/pinot/spi/utils/CommonConstants.java | 2 +- 7 files changed, 48 insertions(+), 16 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 35996ebb0b..e3c94bda96 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -143,12 +143,16 @@ public enum ServerMeter implements AbstractMetrics.Meter { HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true), /** * Number of times the max number of groups has been reached. - * It is increased at most one by one each time per stage. - * That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 1. - * But if a single query has 2 different aggregate operators and each one reaches the limit, this will be increased - * by 2. + * It is increased in one by each worker that reaches the limit within the stage. + * That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 10. */ AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED("times", true), + /** + * Number of times the warning threshold for number of groups has been reached. + * It is increased in one by each worker that reaches the limit within the stage. + * That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 10. + */ + AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED("times", true), /** * The number of blocks that have been sent to the next stage without being serialized. * This is the sum of all blocks sent by all workers in the stage. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index af65d90c60..c17e64d8de 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -25,6 +25,8 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.utils.DataSchema; @@ -168,12 +170,16 @@ public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> { // Check if the groups limit is reached boolean numGroupsLimitReached = groupKeyGenerator.getNumKeys() >= _queryContext.getNumGroupsLimit(); + if (numGroupsLimitReached) { + ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED, 1); + } Tracing.activeRecording().setNumGroups(_queryContext.getNumGroupsLimit(), groupKeyGenerator.getNumKeys()); boolean numGroupsWarningLimitReached = groupKeyGenerator.getNumKeys() >= _queryContext.getNumGroupsWarningLimit(); if (numGroupsWarningLimitReached) { LOGGER.warn("numGroups reached warning limit: {} (actual: {})", _queryContext.getNumGroupsWarningLimit(), groupKeyGenerator.getNumKeys()); + ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED, 1); } // Trim the groups when iff: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java index b4758848cd..33072d5fd4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Operator; @@ -117,12 +119,16 @@ public class GroupByOperator extends BaseOperator<GroupByResultsBlock> { // Check if the groups limit is reached boolean numGroupsLimitReached = groupByExecutor.getNumGroups() >= _queryContext.getNumGroupsLimit(); + if (numGroupsLimitReached) { + ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED, 1); + } Tracing.activeRecording().setNumGroups(_queryContext.getNumGroupsLimit(), groupByExecutor.getNumGroups()); boolean numGroupsWarningLimitReached = groupByExecutor.getNumGroups() >= _queryContext.getNumGroupsWarningLimit(); if (numGroupsWarningLimitReached) { LOGGER.warn("numGroups reached warning limit: {} (actual: {})", _queryContext.getNumGroupsWarningLimit(), groupByExecutor.getNumGroups()); + ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED, 1); } // Trim the groups when iff: diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java index c86a38df27..a253f787fc 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java @@ -58,6 +58,7 @@ public class JmxMetricsIntegrationTest extends BaseClusterIntegrationTestSet { private static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); private static final String PINOT_JMX_METRICS_DOMAIN = "\"org.apache.pinot.common.metrics\""; private static final String BROKER_METRICS_TYPE = "\"BrokerMetrics\""; + private static final String SERVER_METRICS_TYPE = "\"ServerMetrics\""; @BeforeClass public void setUp() throws Exception { @@ -153,6 +154,22 @@ public class JmxMetricsIntegrationTest extends BaseClusterIntegrationTestSet { assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count"), 6L); } + @Test(dataProvider = "useBothQueryEngines") + public void testNumGroupsLimitMetrics(boolean useMultiStageEngine) throws Exception { + ObjectName aggregateTimesNumGroupsLimitReachedMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN, + new Hashtable<>(Map.of("type", SERVER_METRICS_TYPE, + "name", "\"pinot.server.aggregateTimesNumGroupsLimitReached\""))); + + ObjectName aggregateTimesNumGroupsWarningLimitReachedMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN, + new Hashtable<>(Map.of("type", SERVER_METRICS_TYPE, + "name", "\"pinot.server.aggregateTimesNumGroupsWarningLimitReached\""))); + + postQuery("SET useMultiStageEngine=" + useMultiStageEngine + ";SET numGroupsLimit=100;" + + "SELECT DestState, Dest, count(*) FROM mytable GROUP BY DestState, Dest"); + assertTrue((Long) MBEAN_SERVER.getAttribute(aggregateTimesNumGroupsLimitReachedMetric, "Count") > 0L); + assertTrue((Long) MBEAN_SERVER.getAttribute(aggregateTimesNumGroupsWarningLimitReachedMetric, "Count") > 0L); + } + @Test public void testEstimatedMseServerThreadsBrokerMetric() throws Exception { ObjectName estimatedMseServerThreadsMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN, @@ -189,4 +206,9 @@ public class JmxMetricsIntegrationTest extends BaseClusterIntegrationTestSet { protected void overrideBrokerConf(PinotConfiguration brokerConf) { brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, "true"); } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + serverConf.setProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT, 1); + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index fb5f340961..5c2c4e2cc1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -158,7 +158,7 @@ public class QueryRunner { String numGroupsLimitStr = config.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT); _numGroupsLimit = numGroupsLimitStr != null ? Integer.parseInt(numGroupsLimitStr) : null; - String numGroupsWarnLimitStr = config.getProperty(Server.CONFIG_OF_NUM_GROUPS_WARN_LIMIT); + String numGroupsWarnLimitStr = config.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT); _numGroupsWarningLimit = numGroupsWarnLimitStr != null ? Integer.parseInt(numGroupsWarnLimitStr) : null; String mseMinGroupTrimSizeStr = config.getProperty(Server.CONFIG_OF_MSE_MIN_GROUP_TRIM_SIZE); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 52b279ddeb..ccf107d197 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -221,16 +221,10 @@ public abstract class MultiStageOperator response.mergeMaxRowsInOperator(stats.getLong(AggregateOperator.StatKey.EMITTED_ROWS)); } - @Override - public void updateServerMetrics(StatMap<?> map, ServerMetrics serverMetrics) { - super.updateServerMetrics(map, serverMetrics); - @SuppressWarnings("unchecked") - StatMap<AggregateOperator.StatKey> stats = (StatMap<AggregateOperator.StatKey>) map; - boolean limitReached = stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED); - if (limitReached) { - serverMetrics.addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED, 1); - } - } + /// So far this keys do not need to be modified from here because they are incremented in a per-worker basis: + /// ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED + /// ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED + /// public void updateServerMetrics(StatMap<?> map, ServerMetrics serverMetrics); }, FILTER(FilterOperator.StatKey.class) { @Override diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 554dfc9037..1151a78077 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -846,7 +846,7 @@ public class CommonConstants { QUERY_EXECUTOR_CONFIG_PREFIX + "." + NUM_GROUPS_LIMIT; public static final int DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT = 100_000; public static final String NUM_GROUPS_WARN_LIMIT = "num.groups.warn.limit"; - public static final String CONFIG_OF_NUM_GROUPS_WARN_LIMIT = + public static final String CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT = QUERY_EXECUTOR_CONFIG_PREFIX + "." + NUM_GROUPS_WARN_LIMIT; public static final int DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT = 150_000; public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "max.init.group.holder.capacity"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org