This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3b004958d9 Add metrics for monitoring server's message queue size (#15722) 3b004958d9 is described below commit 3b004958d97f345ba91d2afd1e45a70c70163bbc Author: Songqiao Su <andysongq...@gmail.com> AuthorDate: Wed May 7 15:21:03 2025 -0700 Add metrics for monitoring server's message queue size (#15722) --- .../apache/pinot/common/metrics/ServerGauge.java | 5 ++++- .../server/starter/helix/BaseServerStarter.java | 26 ++++++++++++++++++++++ .../apache/pinot/spi/utils/CommonConstants.java | 4 ++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index efc98965b7..41596ddf44 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -117,7 +117,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge { MAILBOX_SERVER_CACHE_SIZE_SMALL("bytes", true), MAILBOX_SERVER_CACHE_SIZE_NORMAL("bytes", true), MAILBOX_SERVER_THREADLOCALCACHE("bytes", true), - MAILBOX_SERVER_CHUNK_SIZE("bytes", true); + MAILBOX_SERVER_CHUNK_SIZE("bytes", true), + + // how many message are there in the server's message queue in helix + HELIX_MESSAGES_COUNT("count", true); private final String _gaugeName; private final String _unit; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 5704d44875..94cbe95b01 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -21,6 +21,7 @@ package org.apache.pinot.server.starter.helix; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -32,12 +33,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -162,6 +166,7 @@ public abstract class BaseServerStarter implements ServiceStartable { protected SegmentOperationsThrottler _segmentOperationsThrottler; protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler; protected volatile boolean _isServerReadyToServeQueries = false; + private ScheduledExecutorService _helixMessageCountScheduler; @Override public void init(PinotConfiguration serverConf) @@ -698,6 +703,14 @@ public abstract class BaseServerStarter implements ServiceStartable { _helixAdmin = _helixManager.getClusterManagmentTool(); updateInstanceConfigIfNeeded(serverConf); + // Start a background task to monitor Helix message count + int refreshIntervalSeconds = _serverConf.getProperty(Server.CONFIG_OF_MESSAGES_COUNT_REFRESH_INTERVAL_SECONDS, + Server.DEFAULT_MESSAGES_COUNT_REFRESH_INTERVAL_SECONDS); + _helixMessageCountScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("message-count-scheduler-%d").setDaemon(true).build()); + _helixMessageCountScheduler.scheduleAtFixedRate(this::refreshMessageCount, 0, refreshIntervalSeconds, + TimeUnit.SECONDS); + LOGGER.info("Initializing and registering the DefaultClusterConfigChangeHandler"); try { _helixManager.addClusterfigChangeListener(_clusterConfigChangeHandler); @@ -1058,4 +1071,17 @@ public abstract class BaseServerStarter implements ServiceStartable { protected AdminApiApplication createServerAdminApp() { return new AdminApiApplication(_serverInstance, _accessControlFactory, _serverConf); } + + private void refreshMessageCount() { + try { + HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor(); + List<String> children = dataAccessor.getBaseDataAccessor() + .getChildNames(String.format("/%s/INSTANCES/%s/MESSAGES", _helixClusterName, _instanceId), 0); + int messageCount = children == null ? 0 : children.size(); + ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); + serverMetrics.setValueOfGlobalGauge(ServerGauge.HELIX_MESSAGES_COUNT, messageCount); + } catch (Exception e) { + LOGGER.warn("Failed to refresh Helix message count", e); + } + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 1fbe76a540..cd6308a30e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1041,6 +1041,10 @@ public class CommonConstants { public static final String LUCENE_MIN_REFRESH_INTERVAL_MS = "pinot.server.lucene.min.refresh.interval.ms"; public static final int DEFAULT_LUCENE_MIN_REFRESH_INTERVAL_MS = 10; + public static final String CONFIG_OF_MESSAGES_COUNT_REFRESH_INTERVAL_SECONDS = + "pinot.server.messagesCount.refreshIntervalSeconds"; + public static final int DEFAULT_MESSAGES_COUNT_REFRESH_INTERVAL_SECONDS = 30; + public static class SegmentCompletionProtocol { public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = "pinot.server.segment.uploader"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org