This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a244449bc fix numGroups metric and add metric for warnings (#15280)
1a244449bc is described below

commit 1a244449bc8fdd3966701c14d67501f0e01aba65
Author: Alberto Bastos <alberto.var...@startree.ai>
AuthorDate: Fri Mar 28 10:30:49 2025 +0100

    fix numGroups metric and add metric for warnings (#15280)
---
 .../apache/pinot/common/metrics/ServerMeter.java   | 12 ++++++++----
 .../operator/query/FilteredGroupByOperator.java    |  6 ++++++
 .../pinot/core/operator/query/GroupByOperator.java |  6 ++++++
 .../tests/JmxMetricsIntegrationTest.java           | 22 ++++++++++++++++++++++
 .../apache/pinot/query/runtime/QueryRunner.java    |  2 +-
 .../query/runtime/operator/MultiStageOperator.java | 14 ++++----------
 .../apache/pinot/spi/utils/CommonConstants.java    |  2 +-
 7 files changed, 48 insertions(+), 16 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 35996ebb0b..e3c94bda96 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -143,12 +143,16 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true),
   /**
    * Number of times the max number of groups has been reached.
-   * It is increased at most one by one each time per stage.
-   * That means that if a stage has 10 workers and all of them reach the 
limit, this will be increased by 1.
-   * But if a single query has 2 different aggregate operators and each one 
reaches the limit, this will be increased
-   * by 2.
+   * It is increased in one by each worker that reaches the limit within the 
stage.
+   * That means that if a stage has 10 workers and all of them reach the 
limit, this will be increased by 10.
    */
   AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED("times", true),
+  /**
+   * Number of times the warning threshold for number of groups has been 
reached.
+   * It is increased in one by each worker that reaches the limit within the 
stage.
+   * That means that if a stage has 10 workers and all of them reach the 
limit, this will be increased by 10.
+   */
+  AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED("times", true),
   /**
    * The number of blocks that have been sent to the next stage without being 
serialized.
    * This is the sum of all blocks sent by all workers in the stage.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index af65d90c60..c17e64d8de 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -25,6 +25,8 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.utils.DataSchema;
@@ -168,12 +170,16 @@ public class FilteredGroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
 
     // Check if the groups limit is reached
     boolean numGroupsLimitReached = groupKeyGenerator.getNumKeys() >= 
_queryContext.getNumGroupsLimit();
+    if (numGroupsLimitReached) {
+      
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED,
 1);
+    }
     Tracing.activeRecording().setNumGroups(_queryContext.getNumGroupsLimit(), 
groupKeyGenerator.getNumKeys());
 
     boolean numGroupsWarningLimitReached = groupKeyGenerator.getNumKeys() >= 
_queryContext.getNumGroupsWarningLimit();
     if (numGroupsWarningLimitReached) {
       LOGGER.warn("numGroups reached warning limit: {} (actual: {})",
           _queryContext.getNumGroupsWarningLimit(), 
groupKeyGenerator.getNumKeys());
+      
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED,
 1);
     }
 
     // Trim the groups when iff:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
index b4758848cd..33072d5fd4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
@@ -117,12 +119,16 @@ public class GroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
 
     // Check if the groups limit is reached
     boolean numGroupsLimitReached = groupByExecutor.getNumGroups() >= 
_queryContext.getNumGroupsLimit();
+    if (numGroupsLimitReached) {
+      
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED,
 1);
+    }
     Tracing.activeRecording().setNumGroups(_queryContext.getNumGroupsLimit(), 
groupByExecutor.getNumGroups());
 
     boolean numGroupsWarningLimitReached = groupByExecutor.getNumGroups() >= 
_queryContext.getNumGroupsWarningLimit();
     if (numGroupsWarningLimitReached) {
       LOGGER.warn("numGroups reached warning limit: {} (actual: {})",
           _queryContext.getNumGroupsWarningLimit(), 
groupByExecutor.getNumGroups());
+      
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED,
 1);
     }
 
     // Trim the groups when iff:
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
index c86a38df27..a253f787fc 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
@@ -58,6 +58,7 @@ public class JmxMetricsIntegrationTest extends 
BaseClusterIntegrationTestSet {
   private static final MBeanServer MBEAN_SERVER = 
ManagementFactory.getPlatformMBeanServer();
   private static final String PINOT_JMX_METRICS_DOMAIN = 
"\"org.apache.pinot.common.metrics\"";
   private static final String BROKER_METRICS_TYPE = "\"BrokerMetrics\"";
+  private static final String SERVER_METRICS_TYPE = "\"ServerMetrics\"";
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -153,6 +154,22 @@ public class JmxMetricsIntegrationTest extends 
BaseClusterIntegrationTestSet {
     assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, 
"Count"), 6L);
   }
 
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testNumGroupsLimitMetrics(boolean useMultiStageEngine) throws 
Exception {
+    ObjectName aggregateTimesNumGroupsLimitReachedMetric = new 
ObjectName(PINOT_JMX_METRICS_DOMAIN,
+        new Hashtable<>(Map.of("type", SERVER_METRICS_TYPE,
+            "name", "\"pinot.server.aggregateTimesNumGroupsLimitReached\"")));
+
+    ObjectName aggregateTimesNumGroupsWarningLimitReachedMetric = new 
ObjectName(PINOT_JMX_METRICS_DOMAIN,
+        new Hashtable<>(Map.of("type", SERVER_METRICS_TYPE,
+            "name", 
"\"pinot.server.aggregateTimesNumGroupsWarningLimitReached\"")));
+
+    postQuery("SET useMultiStageEngine=" + useMultiStageEngine + ";SET 
numGroupsLimit=100;"
+        + "SELECT DestState, Dest, count(*) FROM mytable GROUP BY DestState, 
Dest");
+    assertTrue((Long) 
MBEAN_SERVER.getAttribute(aggregateTimesNumGroupsLimitReachedMetric, "Count") > 
0L);
+    assertTrue((Long) 
MBEAN_SERVER.getAttribute(aggregateTimesNumGroupsWarningLimitReachedMetric, 
"Count") > 0L);
+  }
+
   @Test
   public void testEstimatedMseServerThreadsBrokerMetric() throws Exception {
     ObjectName estimatedMseServerThreadsMetric = new 
ObjectName(PINOT_JMX_METRICS_DOMAIN,
@@ -189,4 +206,9 @@ public class JmxMetricsIntegrationTest extends 
BaseClusterIntegrationTestSet {
   protected void overrideBrokerConf(PinotConfiguration brokerConf) {
     
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
 "true");
   }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    
serverConf.setProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT,
 1);
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index fb5f340961..5c2c4e2cc1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -158,7 +158,7 @@ public class QueryRunner {
     String numGroupsLimitStr = 
config.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
     _numGroupsLimit = numGroupsLimitStr != null ? 
Integer.parseInt(numGroupsLimitStr) : null;
 
-    String numGroupsWarnLimitStr = 
config.getProperty(Server.CONFIG_OF_NUM_GROUPS_WARN_LIMIT);
+    String numGroupsWarnLimitStr = 
config.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT);
     _numGroupsWarningLimit = numGroupsWarnLimitStr != null ? 
Integer.parseInt(numGroupsWarnLimitStr) : null;
 
     String mseMinGroupTrimSizeStr = 
config.getProperty(Server.CONFIG_OF_MSE_MIN_GROUP_TRIM_SIZE);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 52b279ddeb..ccf107d197 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -221,16 +221,10 @@ public abstract class MultiStageOperator
         
response.mergeMaxRowsInOperator(stats.getLong(AggregateOperator.StatKey.EMITTED_ROWS));
       }
 
-      @Override
-      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
-        super.updateServerMetrics(map, serverMetrics);
-        @SuppressWarnings("unchecked")
-        StatMap<AggregateOperator.StatKey> stats = 
(StatMap<AggregateOperator.StatKey>) map;
-        boolean limitReached = 
stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED);
-        if (limitReached) {
-          
serverMetrics.addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED,
 1);
-        }
-      }
+      /// So far this keys do not need to be modified from here because they 
are incremented in a per-worker basis:
+      /// ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED
+      /// ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED
+      /// public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics);
     },
     FILTER(FilterOperator.StatKey.class) {
       @Override
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 554dfc9037..1151a78077 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -846,7 +846,7 @@ public class CommonConstants {
         QUERY_EXECUTOR_CONFIG_PREFIX + "." + NUM_GROUPS_LIMIT;
     public static final int DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT = 100_000;
     public static final String NUM_GROUPS_WARN_LIMIT = "num.groups.warn.limit";
-    public static final String CONFIG_OF_NUM_GROUPS_WARN_LIMIT =
+    public static final String CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT =
         QUERY_EXECUTOR_CONFIG_PREFIX + "." + NUM_GROUPS_WARN_LIMIT;
     public static final int DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_WARN_LIMIT = 
150_000;
     public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"max.init.group.holder.capacity";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to