This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 43089b15f3 Add netty memory related metrics for GrpcMailboxServer 
(#15651)
43089b15f3 is described below

commit 43089b15f3ce1ceb30ae0469b8cf95b5ea98a50a
Author: Songqiao Su <andysongq...@gmail.com>
AuthorDate: Sun Apr 27 00:53:00 2025 -0700

    Add netty memory related metrics for GrpcMailboxServer (#15651)
---
 .../apache/pinot/common/metrics/BrokerGauge.java   | 12 +++-
 .../apache/pinot/common/metrics/ServerGauge.java   | 12 +++-
 .../apache/pinot/common/metrics/ServerMetrics.java |  4 +-
 .../query/mailbox/channel/GrpcMailboxServer.java   | 64 ++++++++++++++++------
 4 files changed, 70 insertions(+), 22 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 6b353a47fa..26c1c871df 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -77,7 +77,17 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
   /**
    * The estimated number of query server threads for all currently running 
multi-stage queries.
    */
-  ESTIMATED_MSE_SERVER_THREADS("number", true);
+  ESTIMATED_MSE_SERVER_THREADS("number", true),
+
+  // GrpcMailboxServer memory metrics
+  MAILBOX_SERVER_USED_DIRECT_MEMORY("bytes", true),
+  MAILBOX_SERVER_USED_HEAP_MEMORY("bytes", true),
+  MAILBOX_SERVER_ARENAS_DIRECT("arenas", true),
+  MAILBOX_SERVER_ARENAS_HEAP("arenas", true),
+  MAILBOX_SERVER_CACHE_SIZE_SMALL("bytes", true),
+  MAILBOX_SERVER_CACHE_SIZE_NORMAL("bytes", true),
+  MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
+  MAILBOX_SERVER_CHUNK_SIZE("bytes", true);
 
   private final String _brokerGaugeName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 5f894543d8..1f98d3fbc1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -97,7 +97,17 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   REALTIME_CONSUMER_DIR_USAGE("bytes", true),
   SEGMENT_DOWNLOAD_SPEED("bytes", true),
   PREDOWNLOAD_SPEED("bytes", true),
-  ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true);
+  ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true),
+
+  // GrpcMailboxServer memory metrics
+  MAILBOX_SERVER_USED_DIRECT_MEMORY("bytes", true),
+  MAILBOX_SERVER_USED_HEAP_MEMORY("bytes", true),
+  MAILBOX_SERVER_ARENAS_DIRECT("arenas", true),
+  MAILBOX_SERVER_ARENAS_HEAP("arenas", true),
+  MAILBOX_SERVER_CACHE_SIZE_SMALL("bytes", true),
+  MAILBOX_SERVER_CACHE_SIZE_NORMAL("bytes", true),
+  MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
+  MAILBOX_SERVER_CHUNK_SIZE("bytes", true);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
index 7e253d0012..6f91cd2914 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
@@ -55,9 +55,7 @@ public class ServerMetrics extends 
AbstractMetrics<ServerQueryPhase, ServerMeter
    * should always call after registration
    */
   public static ServerMetrics get() {
-    ServerMetrics ret = SERVER_METRICS_INSTANCE.get();
-    assert ret != null;
-    return ret;
+    return SERVER_METRICS_INSTANCE.get();
   }
 
   public ServerMetrics(PinotMetricsRegistry metricsRegistry) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index 61d7f06171..2217c0e648 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -19,13 +19,19 @@
 package org.apache.pinot.query.mailbox.channel;
 
 import io.grpc.Server;
-import io.grpc.ServerBuilder;
 import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric;
+import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
@@ -49,24 +55,48 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
   public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration 
config, @Nullable TlsConfig tlsConfig) {
     _mailboxService = mailboxService;
     int port = mailboxService.getPort();
+
+    PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true);
+    PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+
+    // Register memory metrics - use ServerMetrics if available, otherwise use 
BrokerMetrics
+    ServerMetrics serverMetrics = ServerMetrics.get();
+    BrokerMetrics brokerMetrics = BrokerMetrics.get();
+    if (serverMetrics != null) {
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_USED_DIRECT_MEMORY,
 metric::usedDirectMemory);
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_USED_HEAP_MEMORY,
 metric::usedHeapMemory);
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_ARENAS_DIRECT, 
metric::numDirectArenas);
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_ARENAS_HEAP, 
metric::numHeapArenas);
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_CACHE_SIZE_SMALL,
 metric::smallCacheSize);
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_CHUNK_SIZE, 
metric::chunkSize);
+    } else if (brokerMetrics != null) {
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_USED_DIRECT_MEMORY,
 metric::usedDirectMemory);
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_USED_HEAP_MEMORY,
 metric::usedHeapMemory);
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_ARENAS_DIRECT, 
metric::numDirectArenas);
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_ARENAS_HEAP, 
metric::numHeapArenas);
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_CACHE_SIZE_SMALL,
 metric::smallCacheSize);
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_CHUNK_SIZE, 
metric::chunkSize);
+    }
+
+    NettyServerBuilder builder = NettyServerBuilder
+        .forPort(port)
+        .addService(this)
+        .withOption(ChannelOption.ALLOCATOR, bufAllocator)
+        .withChildOption(ChannelOption.ALLOCATOR, bufAllocator)
+        .maxInboundMessageSize(config.getProperty(
+            
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+            
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES));
+
+    // Add SSL context only if TLS is configured
     if (tlsConfig != null) {
-      _server = NettyServerBuilder
-          .forPort(port)
-          .addService(this)
-          .sslContext(GrpcQueryServer.buildGrpcSslContext(tlsConfig))
-          .maxInboundMessageSize(config.getProperty(
-              
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-              
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES))
-          .build();
-    } else {
-      _server = ServerBuilder
-          .forPort(port)
-          .addService(this)
-          .maxInboundMessageSize(config.getProperty(
-              
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-              
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES))
-          .build();
+      builder.sslContext(GrpcQueryServer.buildGrpcSslContext(tlsConfig));
     }
+
+    _server = builder.build();
   }
 
   public void start() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to