This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7f09cc8d56 [Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340) 7f09cc8d56 is described below commit 7f09cc8d5664aa877005e37cc1219b3b10286cde Author: Christina Li <42751784+meiha...@users.noreply.github.com> AuthorDate: Fri Feb 23 13:34:06 2024 -0800 [Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340) --- .../broker/broker/helix/BaseBrokerStarter.java | 2 +- .../AdaptiveServerSelectorTest.java | 44 ++++++++++++++++++---- .../apache/pinot/common/metrics/BrokerGauge.java | 7 +++- .../common/reader/PinotServerDataFetcher.scala | 2 +- .../routing/stats/ServerRoutingStatsManager.java | 13 ++++++- .../pinot/core/transport/QueryRoutingTest.java | 2 +- .../stats/ServerRoutingStatsManagerTest.java | 39 +++++++++++++++++-- 7 files changed, 91 insertions(+), 18 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index b95b820f25..218eab54b6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -262,7 +262,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1); BrokerMetrics.register(_brokerMetrics); // Set up request handling classes - _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf); + _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, _brokerMetrics); _serverRoutingStatsManager.init(); _routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf); _routingManager.init(_spectatorHelixManager); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java index ed976b3b03..926de0699d 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java @@ -20,16 +20,21 @@ package org.apache.pinot.broker.routing.adaptiveserverselector; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.utils.ExponentialMovingAverage; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -39,16 +44,39 @@ import static org.testng.Assert.assertTrue; public class AdaptiveServerSelectorTest { + private BrokerMetrics _brokerMetrics; + List<String> _servers = Arrays.asList("server1", "server2", "server3", "server4"); Map<String, Object> _properties = new HashMap<>(); + @BeforeTest + public void initBrokerMetrics() { + // Set up metric registry and broker metrics + PinotConfiguration brokerConfig = new PinotConfiguration(); + PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry( + brokerConfig.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX)); + _brokerMetrics = new BrokerMetrics( + brokerConfig.getProperty( + CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX, + CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX), + metricsRegistry, + brokerConfig.getProperty( + CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS, + CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS), + brokerConfig.getProperty( + CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS, + Collections.emptyList())); + _brokerMetrics.initializeGlobalMeters(); + BrokerMetrics.register(_brokerMetrics); + } + @Test public void testAdaptiveServerSelectorFactory() { // Test 1: Test disabling Adaptive Server Selection . _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE, CommonConstants.Broker.AdaptiveServerSelector.Type.NO_OP.name()); PinotConfiguration cfg = new PinotConfiguration(_properties); - ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics); assertNull(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager, cfg)); // Enable stats collection. Without this, AdaptiveServerSelectors cannot be used. @@ -58,7 +86,7 @@ public class AdaptiveServerSelectorTest { _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE, CommonConstants.Broker.AdaptiveServerSelector.Type.NUM_INFLIGHT_REQ.name()); cfg = new PinotConfiguration(_properties); - serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics); assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager, cfg) instanceof NumInFlightReqSelector); @@ -66,7 +94,7 @@ public class AdaptiveServerSelectorTest { _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE, CommonConstants.Broker.AdaptiveServerSelector.Type.LATENCY.name()); cfg = new PinotConfiguration(_properties); - serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics); assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager, cfg) instanceof LatencySelector); @@ -74,7 +102,7 @@ public class AdaptiveServerSelectorTest { _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE, CommonConstants.Broker.AdaptiveServerSelector.Type.HYBRID.name()); cfg = new PinotConfiguration(_properties); - serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics); assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager, cfg) instanceof HybridSelector); @@ -82,7 +110,7 @@ public class AdaptiveServerSelectorTest { assertThrows(IllegalArgumentException.class, () -> { _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE, "Dummy"); PinotConfiguration config = new PinotConfiguration(_properties); - ServerRoutingStatsManager manager = new ServerRoutingStatsManager(config); + ServerRoutingStatsManager manager = new ServerRoutingStatsManager(config, _brokerMetrics); AdaptiveServerSelectorFactory.getAdaptiveServerSelector(manager, config); }); } @@ -91,7 +119,7 @@ public class AdaptiveServerSelectorTest { public void testNumInFlightReqSelector() { _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true); PinotConfiguration cfg = new PinotConfiguration(_properties); - ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics); serverRoutingStatsManager.init(); assertTrue(serverRoutingStatsManager.isEnabled()); long taskCount = 0; @@ -290,7 +318,7 @@ public class AdaptiveServerSelectorTest { _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL, avgInitializationVal); PinotConfiguration cfg = new PinotConfiguration(_properties); - ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics); serverRoutingStatsManager.init(); assertTrue(serverRoutingStatsManager.isEnabled()); long taskCount = 0; @@ -430,7 +458,7 @@ public class AdaptiveServerSelectorTest { hybridSelectorExponent); PinotConfiguration cfg = new PinotConfiguration(_properties); - ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics); serverRoutingStatsManager.init(); assertTrue(serverRoutingStatsManager.isEnabled()); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java index 25e66eabd0..601544b9f1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java @@ -56,7 +56,12 @@ public enum BrokerGauge implements AbstractMetrics.Gauge { * The cache size used by the allocator for normal arenas */ NETTY_POOLED_THREADLOCALCACHE("bytes", true), - NETTY_POOLED_CHUNK_SIZE("bytes", true); + NETTY_POOLED_CHUNK_SIZE("bytes", true), + + /** + * The queue size of ServerRoutingStatsManager main executor service. + */ + ROUTING_STATS_MANAGER_QUEUE_SIZE("routingStatsManagerQueueSize", true); private final String _brokerGaugeName; private final String _unit; diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala index 48255233a9..1a0443203b 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala @@ -48,7 +48,7 @@ private[reader] class PinotServerDataFetcher( private val metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry private val brokerMetrics = new BrokerMetrics(metricsRegistry) private val pinotConfig = new PinotConfiguration() - private val serverRoutingStatsManager = new ServerRoutingStatsManager(pinotConfig) + private val serverRoutingStatsManager = new ServerRoutingStatsManager(pinotConfig, brokerMetrics) private val queryRouter = new QueryRouter(brokerId, brokerMetrics, serverRoutingStatsManager) // TODO add support for TLS-secured server diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java index 87c80566c1..1057fc084f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java @@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.metrics.BrokerGauge; +import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants.Broker.AdaptiveServerSelector; import org.slf4j.Logger; @@ -46,6 +48,7 @@ public class ServerRoutingStatsManager { private static final Logger LOGGER = LoggerFactory.getLogger(ServerRoutingStatsManager.class); private final PinotConfiguration _config; + private final BrokerMetrics _brokerMetrics; private volatile boolean _isEnabled; private ConcurrentHashMap<String, ServerRoutingStatsEntry> _serverQueryStatsMap; @@ -61,8 +64,9 @@ public class ServerRoutingStatsManager { private double _avgInitializationVal; private int _hybridScoreExponent; - public ServerRoutingStatsManager(PinotConfiguration pinotConfig) { + public ServerRoutingStatsManager(PinotConfiguration pinotConfig, BrokerMetrics brokerMetrics) { _config = pinotConfig; + _brokerMetrics = brokerMetrics; } public void init() { @@ -138,9 +142,9 @@ public class ServerRoutingStatsManager { return; } - // TODO: Track Executor qSize and alert if it crosses a threshold. _executorService.execute(() -> { try { + recordQueueSizeMetrics(); updateStatsAfterQuerySubmission(serverInstanceId); } catch (Exception e) { LOGGER.error("Exception caught while updating stats. requestId={}, exception={}", requestId, e); @@ -377,4 +381,9 @@ public class ServerRoutingStatsManager { stats.getServerReadLock().unlock(); } } + + private void recordQueueSizeMetrics() { + int queueSize = getQueueSize(); + _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ROUTING_STATS_MANAGER_QUEUE_SIZE, queueSize); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 07aa0e2c08..cbf4174bca 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -73,7 +73,7 @@ public class QueryRoutingTest { Map<String, Object> properties = new HashMap<>(); properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true); PinotConfiguration cfg = new PinotConfiguration(properties); - _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, mock(BrokerMetrics.class)); _serverRoutingStatsManager.init(); _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), _serverRoutingStatsManager); _requestCount = 0; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java index 73dd623b5b..84983f87ec 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java @@ -18,13 +18,18 @@ */ package org.apache.pinot.core.transport.server.routing.stats; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -34,20 +39,44 @@ import static org.testng.Assert.assertTrue; public class ServerRoutingStatsManagerTest { + private BrokerMetrics _brokerMetrics; + + @BeforeTest + public void initBrokerMetrics() { + // Set up metric registry and broker metrics + PinotConfiguration brokerConfig = new PinotConfiguration(); + PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry( + brokerConfig.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX)); + _brokerMetrics = new BrokerMetrics( + brokerConfig.getProperty( + CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX, + CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX), + metricsRegistry, + brokerConfig.getProperty( + CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS, + CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS), + brokerConfig.getProperty( + CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS, + Collections.emptyList())); + _brokerMetrics.initializeGlobalMeters(); + BrokerMetrics.register(_brokerMetrics); + } + @Test public void testInitAndShutDown() { Map<String, Object> properties = new HashMap<>(); // Test 1: Test disabled. properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, false); - ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties)); + ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties), + _brokerMetrics); assertFalse(manager.isEnabled()); manager.init(); assertFalse(manager.isEnabled()); // Test 2: Test enabled. properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true); - manager = new ServerRoutingStatsManager(new PinotConfiguration(properties)); + manager = new ServerRoutingStatsManager(new PinotConfiguration(properties), _brokerMetrics); assertFalse(manager.isEnabled()); manager.init(); assertTrue(manager.isEnabled()); @@ -64,7 +93,8 @@ public class ServerRoutingStatsManagerTest { public void testEmptyStats() { Map<String, Object> properties = new HashMap<>(); properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true); - ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties)); + ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties), + _brokerMetrics); manager.init(); List<Pair<String, Integer>> numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers(); @@ -94,7 +124,8 @@ public class ServerRoutingStatsManagerTest { properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS, 0); properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL, 0.0); properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT, 3); - ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties)); + ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties), + _brokerMetrics); manager.init(); int requestId = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org