This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b004958d9 Add metrics for monitoring server's message queue size 
(#15722)
3b004958d9 is described below

commit 3b004958d97f345ba91d2afd1e45a70c70163bbc
Author: Songqiao Su <andysongq...@gmail.com>
AuthorDate: Wed May 7 15:21:03 2025 -0700

    Add metrics for monitoring server's message queue size (#15722)
---
 .../apache/pinot/common/metrics/ServerGauge.java   |  5 ++++-
 .../server/starter/helix/BaseServerStarter.java    | 26 ++++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  4 ++++
 3 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index efc98965b7..41596ddf44 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -117,7 +117,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   MAILBOX_SERVER_CACHE_SIZE_SMALL("bytes", true),
   MAILBOX_SERVER_CACHE_SIZE_NORMAL("bytes", true),
   MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
-  MAILBOX_SERVER_CHUNK_SIZE("bytes", true);
+  MAILBOX_SERVER_CHUNK_SIZE("bytes", true),
+
+  // how many message are there in the server's message queue in helix
+  HELIX_MESSAGES_COUNT("count", true);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 5704d44875..94cbe95b01 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -21,6 +21,7 @@ package org.apache.pinot.server.starter.helix;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -32,12 +33,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -162,6 +166,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
   protected SegmentOperationsThrottler _segmentOperationsThrottler;
   protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler;
   protected volatile boolean _isServerReadyToServeQueries = false;
+  private ScheduledExecutorService _helixMessageCountScheduler;
 
   @Override
   public void init(PinotConfiguration serverConf)
@@ -698,6 +703,14 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _helixAdmin = _helixManager.getClusterManagmentTool();
     updateInstanceConfigIfNeeded(serverConf);
 
+    // Start a background task to monitor Helix message count
+    int refreshIntervalSeconds = 
_serverConf.getProperty(Server.CONFIG_OF_MESSAGES_COUNT_REFRESH_INTERVAL_SECONDS,
+        Server.DEFAULT_MESSAGES_COUNT_REFRESH_INTERVAL_SECONDS);
+    _helixMessageCountScheduler = Executors.newSingleThreadScheduledExecutor(
+        new 
ThreadFactoryBuilder().setNameFormat("message-count-scheduler-%d").setDaemon(true).build());
+    _helixMessageCountScheduler.scheduleAtFixedRate(this::refreshMessageCount, 
0, refreshIntervalSeconds,
+        TimeUnit.SECONDS);
+
     LOGGER.info("Initializing and registering the 
DefaultClusterConfigChangeHandler");
     try {
       _helixManager.addClusterfigChangeListener(_clusterConfigChangeHandler);
@@ -1058,4 +1071,17 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
   protected AdminApiApplication createServerAdminApp() {
     return new AdminApiApplication(_serverInstance, _accessControlFactory, 
_serverConf);
   }
+
+  private void refreshMessageCount() {
+    try {
+      HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor();
+      List<String> children = dataAccessor.getBaseDataAccessor()
+          .getChildNames(String.format("/%s/INSTANCES/%s/MESSAGES", 
_helixClusterName, _instanceId), 0);
+      int messageCount = children == null ? 0 : children.size();
+      ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
+      serverMetrics.setValueOfGlobalGauge(ServerGauge.HELIX_MESSAGES_COUNT, 
messageCount);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to refresh Helix message count", e);
+    }
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1fbe76a540..cd6308a30e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1041,6 +1041,10 @@ public class CommonConstants {
     public static final String LUCENE_MIN_REFRESH_INTERVAL_MS = 
"pinot.server.lucene.min.refresh.interval.ms";
     public static final int DEFAULT_LUCENE_MIN_REFRESH_INTERVAL_MS = 10;
 
+    public static final String 
CONFIG_OF_MESSAGES_COUNT_REFRESH_INTERVAL_SECONDS =
+        "pinot.server.messagesCount.refreshIntervalSeconds";
+    public static final int DEFAULT_MESSAGES_COUNT_REFRESH_INTERVAL_SECONDS = 
30;
+
     public static class SegmentCompletionProtocol {
       public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = 
"pinot.server.segment.uploader";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to