This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f09cc8d56 [Adaptive Server Selector] Add metrics for Stats Manager 
Queue Size (#12340)
7f09cc8d56 is described below

commit 7f09cc8d5664aa877005e37cc1219b3b10286cde
Author: Christina Li <42751784+meiha...@users.noreply.github.com>
AuthorDate: Fri Feb 23 13:34:06 2024 -0800

    [Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |  2 +-
 .../AdaptiveServerSelectorTest.java                | 44 ++++++++++++++++++----
 .../apache/pinot/common/metrics/BrokerGauge.java   |  7 +++-
 .../common/reader/PinotServerDataFetcher.scala     |  2 +-
 .../routing/stats/ServerRoutingStatsManager.java   | 13 ++++++-
 .../pinot/core/transport/QueryRoutingTest.java     |  2 +-
 .../stats/ServerRoutingStatsManagerTest.java       | 39 +++++++++++++++++--
 7 files changed, 91 insertions(+), 18 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index b95b820f25..218eab54b6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -262,7 +262,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION, 
PinotVersion.VERSION_METRIC_NAME, 1);
     BrokerMetrics.register(_brokerMetrics);
     // Set up request handling classes
-    _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf);
+    _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, 
_brokerMetrics);
     _serverRoutingStatsManager.init();
     _routingManager = new BrokerRoutingManager(_brokerMetrics, 
_serverRoutingStatsManager, _brokerConf);
     _routingManager.init(_spectatorHelixManager);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
index ed976b3b03..926de0699d 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
@@ -20,16 +20,21 @@ package 
org.apache.pinot.broker.routing.adaptiveserverselector;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.ExponentialMovingAverage;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -39,16 +44,39 @@ import static org.testng.Assert.assertTrue;
 
 
 public class AdaptiveServerSelectorTest {
+  private BrokerMetrics _brokerMetrics;
+
   List<String> _servers = Arrays.asList("server1", "server2", "server3", 
"server4");
   Map<String, Object> _properties = new HashMap<>();
 
+  @BeforeTest
+  public void initBrokerMetrics() {
+    // Set up metric registry and broker metrics
+    PinotConfiguration brokerConfig = new PinotConfiguration();
+    PinotMetricsRegistry metricsRegistry = 
PinotMetricUtils.getPinotMetricsRegistry(
+        brokerConfig.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX));
+    _brokerMetrics = new BrokerMetrics(
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX,
+            CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX),
+        metricsRegistry,
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS,
+            CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS),
+        brokerConfig.getProperty(
+            
CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS,
+            Collections.emptyList()));
+    _brokerMetrics.initializeGlobalMeters();
+    BrokerMetrics.register(_brokerMetrics);
+  }
+
   @Test
   public void testAdaptiveServerSelectorFactory() {
     // Test 1: Test disabling Adaptive Server Selection .
     
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
         CommonConstants.Broker.AdaptiveServerSelector.Type.NO_OP.name());
     PinotConfiguration cfg = new PinotConfiguration(_properties);
-    ServerRoutingStatsManager serverRoutingStatsManager = new 
ServerRoutingStatsManager(cfg);
+    ServerRoutingStatsManager serverRoutingStatsManager = new 
ServerRoutingStatsManager(cfg, _brokerMetrics);
     
assertNull(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
 cfg));
 
     // Enable stats collection. Without this, AdaptiveServerSelectors cannot 
be used.
@@ -58,7 +86,7 @@ public class AdaptiveServerSelectorTest {
     
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
         
CommonConstants.Broker.AdaptiveServerSelector.Type.NUM_INFLIGHT_REQ.name());
     cfg = new PinotConfiguration(_properties);
-    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, 
_brokerMetrics);
     
assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
         cfg) instanceof NumInFlightReqSelector);
 
@@ -66,7 +94,7 @@ public class AdaptiveServerSelectorTest {
     
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
         CommonConstants.Broker.AdaptiveServerSelector.Type.LATENCY.name());
     cfg = new PinotConfiguration(_properties);
-    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, 
_brokerMetrics);
     
assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
         cfg) instanceof LatencySelector);
 
@@ -74,7 +102,7 @@ public class AdaptiveServerSelectorTest {
     
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
         CommonConstants.Broker.AdaptiveServerSelector.Type.HYBRID.name());
     cfg = new PinotConfiguration(_properties);
-    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, 
_brokerMetrics);
     
assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
         cfg) instanceof HybridSelector);
 
@@ -82,7 +110,7 @@ public class AdaptiveServerSelectorTest {
     assertThrows(IllegalArgumentException.class, () -> {
       
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE, 
"Dummy");
       PinotConfiguration config = new PinotConfiguration(_properties);
-      ServerRoutingStatsManager manager = new 
ServerRoutingStatsManager(config);
+      ServerRoutingStatsManager manager = new 
ServerRoutingStatsManager(config, _brokerMetrics);
       AdaptiveServerSelectorFactory.getAdaptiveServerSelector(manager, config);
     });
   }
@@ -91,7 +119,7 @@ public class AdaptiveServerSelectorTest {
   public void testNumInFlightReqSelector() {
     
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
     PinotConfiguration cfg = new PinotConfiguration(_properties);
-    ServerRoutingStatsManager serverRoutingStatsManager = new 
ServerRoutingStatsManager(cfg);
+    ServerRoutingStatsManager serverRoutingStatsManager = new 
ServerRoutingStatsManager(cfg, _brokerMetrics);
     serverRoutingStatsManager.init();
     assertTrue(serverRoutingStatsManager.isEnabled());
     long taskCount = 0;
@@ -290,7 +318,7 @@ public class AdaptiveServerSelectorTest {
     
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
         avgInitializationVal);
     PinotConfiguration cfg = new PinotConfiguration(_properties);
-    ServerRoutingStatsManager serverRoutingStatsManager = new 
ServerRoutingStatsManager(cfg);
+    ServerRoutingStatsManager serverRoutingStatsManager = new 
ServerRoutingStatsManager(cfg, _brokerMetrics);
     serverRoutingStatsManager.init();
     assertTrue(serverRoutingStatsManager.isEnabled());
     long taskCount = 0;
@@ -430,7 +458,7 @@ public class AdaptiveServerSelectorTest {
         hybridSelectorExponent);
 
     PinotConfiguration cfg = new PinotConfiguration(_properties);
-    ServerRoutingStatsManager serverRoutingStatsManager = new 
ServerRoutingStatsManager(cfg);
+    ServerRoutingStatsManager serverRoutingStatsManager = new 
ServerRoutingStatsManager(cfg, _brokerMetrics);
     serverRoutingStatsManager.init();
     assertTrue(serverRoutingStatsManager.isEnabled());
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 25e66eabd0..601544b9f1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -56,7 +56,12 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
    * The cache size used by the allocator for normal arenas
    */
   NETTY_POOLED_THREADLOCALCACHE("bytes", true),
-  NETTY_POOLED_CHUNK_SIZE("bytes", true);
+  NETTY_POOLED_CHUNK_SIZE("bytes", true),
+
+  /**
+   * The queue size of ServerRoutingStatsManager main executor service.
+   */
+  ROUTING_STATS_MANAGER_QUEUE_SIZE("routingStatsManagerQueueSize", true);
 
   private final String _brokerGaugeName;
   private final String _unit;
diff --git 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
index 48255233a9..1a0443203b 100644
--- 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
+++ 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
@@ -48,7 +48,7 @@ private[reader] class PinotServerDataFetcher(
   private val metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry
   private val brokerMetrics = new BrokerMetrics(metricsRegistry)
   private val pinotConfig = new PinotConfiguration()
-  private val serverRoutingStatsManager = new 
ServerRoutingStatsManager(pinotConfig)
+  private val serverRoutingStatsManager = new 
ServerRoutingStatsManager(pinotConfig, brokerMetrics)
   private val queryRouter = new QueryRouter(brokerId, brokerMetrics, 
serverRoutingStatsManager)
   // TODO add support for TLS-secured server
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index 87c80566c1..1057fc084f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.AdaptiveServerSelector;
 import org.slf4j.Logger;
@@ -46,6 +48,7 @@ public class ServerRoutingStatsManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerRoutingStatsManager.class);
 
   private final PinotConfiguration _config;
+  private final BrokerMetrics _brokerMetrics;
   private volatile boolean _isEnabled;
   private ConcurrentHashMap<String, ServerRoutingStatsEntry> 
_serverQueryStatsMap;
 
@@ -61,8 +64,9 @@ public class ServerRoutingStatsManager {
   private double _avgInitializationVal;
   private int _hybridScoreExponent;
 
-  public ServerRoutingStatsManager(PinotConfiguration pinotConfig) {
+  public ServerRoutingStatsManager(PinotConfiguration pinotConfig, 
BrokerMetrics brokerMetrics) {
     _config = pinotConfig;
+    _brokerMetrics = brokerMetrics;
   }
 
   public void init() {
@@ -138,9 +142,9 @@ public class ServerRoutingStatsManager {
       return;
     }
 
-    // TODO: Track Executor qSize and alert if it crosses a threshold.
     _executorService.execute(() -> {
       try {
+        recordQueueSizeMetrics();
         updateStatsAfterQuerySubmission(serverInstanceId);
       } catch (Exception e) {
         LOGGER.error("Exception caught while updating stats. requestId={}, 
exception={}", requestId, e);
@@ -377,4 +381,9 @@ public class ServerRoutingStatsManager {
       stats.getServerReadLock().unlock();
     }
   }
+
+  private void recordQueueSizeMetrics() {
+    int queueSize = getQueueSize();
+    
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ROUTING_STATS_MANAGER_QUEUE_SIZE,
 queueSize);
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 07aa0e2c08..cbf4174bca 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -73,7 +73,7 @@ public class QueryRoutingTest {
     Map<String, Object> properties = new HashMap<>();
     
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
     PinotConfiguration cfg = new PinotConfiguration(properties);
-    _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, 
mock(BrokerMetrics.class));
     _serverRoutingStatsManager.init();
     _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), 
_serverRoutingStatsManager);
     _requestCount = 0;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index 73dd623b5b..84983f87ec 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pinot.core.transport.server.routing.stats;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -34,20 +39,44 @@ import static org.testng.Assert.assertTrue;
 
 
 public class ServerRoutingStatsManagerTest {
+  private BrokerMetrics _brokerMetrics;
+
+  @BeforeTest
+  public void initBrokerMetrics() {
+    // Set up metric registry and broker metrics
+    PinotConfiguration brokerConfig = new PinotConfiguration();
+    PinotMetricsRegistry metricsRegistry = 
PinotMetricUtils.getPinotMetricsRegistry(
+        brokerConfig.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX));
+    _brokerMetrics = new BrokerMetrics(
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX,
+            CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX),
+        metricsRegistry,
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS,
+            CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS),
+        brokerConfig.getProperty(
+            
CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS,
+            Collections.emptyList()));
+    _brokerMetrics.initializeGlobalMeters();
+    BrokerMetrics.register(_brokerMetrics);
+  }
+
   @Test
   public void testInitAndShutDown() {
     Map<String, Object> properties = new HashMap<>();
 
     // Test 1: Test disabled.
     
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 false);
-    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties));
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties),
+        _brokerMetrics);
     assertFalse(manager.isEnabled());
     manager.init();
     assertFalse(manager.isEnabled());
 
     // Test 2: Test enabled.
     
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
-    manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties));
+    manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties), _brokerMetrics);
     assertFalse(manager.isEnabled());
     manager.init();
     assertTrue(manager.isEnabled());
@@ -64,7 +93,8 @@ public class ServerRoutingStatsManagerTest {
   public void testEmptyStats() {
     Map<String, Object> properties = new HashMap<>();
     
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
-    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties));
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties),
+        _brokerMetrics);
     manager.init();
 
     List<Pair<String, Integer>> numInFlightReqList = 
manager.fetchNumInFlightRequestsForAllServers();
@@ -94,7 +124,8 @@ public class ServerRoutingStatsManagerTest {
     
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
 0);
     
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
 0.0);
     
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
 3);
-    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties));
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties),
+        _brokerMetrics);
     manager.init();
 
     int requestId = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to