This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fd802fcf9a Add netty memory related metrics for grpc query/broker 
servers (#15625)
fd802fcf9a is described below

commit fd802fcf9a5fd6cc3d70b3e853ae3978d9957167
Author: Songqiao Su <andysongq...@gmail.com>
AuthorDate: Sun Apr 27 05:53:57 2025 -0700

    Add netty memory related metrics for grpc query/broker servers (#15625)
    
    * add metrics for grpc server
    
    * fmt
    
    * apply to broker grpc server as well
    
    * add .withChildOption
    
    * use own allocator instance
    
    * Update BrokerGauge.java
    
    * Update ServerGauge.java
    
    ---------
    
    Co-authored-by: Xiang Fu <xiangfu.1...@gmail.com>
---
 .../apache/pinot/broker/grpc/BrokerGrpcServer.java | 76 +++++++++++++++++-----
 .../apache/pinot/common/metrics/BrokerGauge.java   | 10 +++
 .../apache/pinot/common/metrics/ServerGauge.java   | 10 +++
 .../pinot/core/transport/grpc/GrpcQueryServer.java | 44 ++++++++++---
 4 files changed, 112 insertions(+), 28 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java 
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
index cc8ccb9b5f..706505a0b0 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
@@ -24,11 +24,13 @@ import com.google.protobuf.ByteString;
 import io.grpc.Attributes;
 import io.grpc.Grpc;
 import io.grpc.Server;
-import io.grpc.ServerBuilder;
 import io.grpc.ServerTransportFilter;
 import io.grpc.Status;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric;
+import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
 import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
@@ -44,6 +46,7 @@ import org.apache.pinot.common.compression.CompressionFactory;
 import org.apache.pinot.common.compression.Compressor;
 import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.proto.Broker;
@@ -106,28 +109,65 @@ public class BrokerGrpcServer extends 
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
     _queryClientConfig = createQueryClientConfig(brokerConf);
     LOGGER.info("gRPC query client config: usePlainText {}", 
_queryClientConfig.isUsePlainText());
     _secureGrpcPort = 
brokerConf.getProperty(CommonConstants.Broker.Grpc.KEY_OF_GRPC_TLS_PORT, -1);
+    _brokerId = brokerId;
+    _brokerRequestHandler = brokerRequestHandler;
+
+    // Determine which port to use
+    int portToUse;
+    boolean isSecure = false;
+
     if (_secureGrpcPort > 0) {
-      try {
-        TlsConfig tlsConfig = TlsUtils.extractTlsConfig(brokerConf, 
CommonConstants.Broker.BROKER_TLS_PREFIX);
-        LOGGER.info("Creating Secure gRPC Server in port {}", _secureGrpcPort);
-        _server =
-            
NettyServerBuilder.forPort(_secureGrpcPort).sslContext(buildGRpcSslContext(tlsConfig)).addService(this)
-                .addTransportFilter(new BrokerGrpcTransportFilter()).build();
-      } catch (Exception e) {
-        throw new RuntimeException("Failed to start secure grpcQueryServer", 
e);
-      }
+      portToUse = _secureGrpcPort;
+      isSecure = true;
+      LOGGER.info("Creating Secure gRPC Server on port {}", portToUse);
     } else if (_grpcPort > 0) {
-      LOGGER.info("Creating plain text gRPC Server in port {}", _grpcPort);
-      _server =
-          
ServerBuilder.forPort(_grpcPort).addService(this).addTransportFilter(new 
BrokerGrpcTransportFilter()).build();
+      portToUse = _grpcPort;
+      LOGGER.info("Creating plain text gRPC Server on port {}", portToUse);
     } else {
-      LOGGER.info("Not creating gRPC Server due to the grpc port is {} and 
secureGrpcPort is {}", _grpcPort,
-          _secureGrpcPort);
+      LOGGER.info("Not creating gRPC Server due to the grpc port is {} and 
secureGrpcPort is {}",
+          _grpcPort, _secureGrpcPort);
       _server = null;
+      return;
     }
-    _brokerId = brokerId;
-    _brokerRequestHandler = brokerRequestHandler;
-    LOGGER.info("Initialized BrokerGrpcServer on port: {}", _grpcPort);
+
+    try {
+      // Create buffer allocator and register metrics
+      PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true);
+      registerBufferMetrics(bufAllocator, brokerMetrics);
+
+      // Build the server with common configuration
+      NettyServerBuilder builder = NettyServerBuilder.forPort(portToUse)
+          .addService(this)
+          .addTransportFilter(new BrokerGrpcTransportFilter())
+          .withChildOption(ChannelOption.ALLOCATOR, bufAllocator)
+          .withOption(ChannelOption.ALLOCATOR, bufAllocator);
+
+      // Add SSL context only for secure connection
+      if (isSecure) {
+        TlsConfig tlsConfig = TlsUtils.extractTlsConfig(brokerConf, 
CommonConstants.Broker.BROKER_TLS_PREFIX);
+        builder.sslContext(buildGRpcSslContext(tlsConfig));
+      }
+      _server = builder.build();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start gRPC server", e);
+    }
+
+    LOGGER.info("Initialized BrokerGrpcServer on port: {}", portToUse);
+  }
+
+  /**
+   * Registers buffer metrics for the given allocator.
+   */
+  private void registerBufferMetrics(PooledByteBufAllocator bufAllocator, 
BrokerMetrics brokerMetrics) {
+    PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_USED_DIRECT_MEMORY,
 metric::usedDirectMemory);
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_USED_HEAP_MEMORY,
 metric::usedHeapMemory);
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_ARENAS_DIRECT,
 metric::numDirectArenas);
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_ARENAS_HEAP, 
metric::numHeapArenas);
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_SMALL,
 metric::smallCacheSize);
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
   }
 
   public void start() {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 26c1c871df..c2e602e03e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -79,6 +79,16 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
    */
   ESTIMATED_MSE_SERVER_THREADS("number", true),
 
+  // gRPC Netty buffer metrics
+  GRPC_NETTY_POOLED_USED_DIRECT_MEMORY("bytes", true),
+  GRPC_NETTY_POOLED_USED_HEAP_MEMORY("bytes", true),
+  GRPC_NETTY_POOLED_ARENAS_DIRECT("arenas", true),
+  GRPC_NETTY_POOLED_ARENAS_HEAP("arenas", true),
+  GRPC_NETTY_POOLED_CACHE_SIZE_SMALL("bytes", true),
+  GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL("bytes", true),
+  GRPC_NETTY_POOLED_THREADLOCALCACHE("bytes", true),
+  GRPC_NETTY_POOLED_CHUNK_SIZE("bytes", true),
+
   // GrpcMailboxServer memory metrics
   MAILBOX_SERVER_USED_DIRECT_MEMORY("bytes", true),
   MAILBOX_SERVER_USED_HEAP_MEMORY("bytes", true),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 1f98d3fbc1..efc98965b7 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -99,6 +99,16 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   PREDOWNLOAD_SPEED("bytes", true),
   ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true),
 
+  // gRPC Netty buffer metrics
+  GRPC_NETTY_POOLED_USED_DIRECT_MEMORY("bytes", true),
+  GRPC_NETTY_POOLED_USED_HEAP_MEMORY("bytes", true),
+  GRPC_NETTY_POOLED_ARENAS_DIRECT("arenas", true),
+  GRPC_NETTY_POOLED_ARENAS_HEAP("arenas", true),
+  GRPC_NETTY_POOLED_CACHE_SIZE_SMALL("bytes", true),
+  GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL("bytes", true),
+  GRPC_NETTY_POOLED_THREADLOCALCACHE("bytes", true),
+  GRPC_NETTY_POOLED_CHUNK_SIZE("bytes", true),
+
   // GrpcMailboxServer memory metrics
   MAILBOX_SERVER_USED_DIRECT_MEMORY("bytes", true),
   MAILBOX_SERVER_USED_HEAP_MEMORY("bytes", true),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 72a116c679..3ece458708 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -21,11 +21,13 @@ package org.apache.pinot.core.transport.grpc;
 import io.grpc.Attributes;
 import io.grpc.Grpc;
 import io.grpc.Server;
-import io.grpc.ServerBuilder;
 import io.grpc.ServerTransportFilter;
 import io.grpc.Status;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric;
+import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
 import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
@@ -41,6 +43,7 @@ import nl.altindag.ssl.SSLFactory;
 import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
@@ -62,6 +65,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+
 // TODO: Plug in QueryScheduler
 public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBase {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcQueryServer.class);
@@ -107,17 +111,37 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
     );
     _queryExecutor = queryExecutor;
     _serverMetrics = serverMetrics;
-    if (tlsConfig != null) {
-      try {
-        _server = 
NettyServerBuilder.forPort(port).sslContext(buildGrpcSslContext(tlsConfig))
-            
.maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()).addService(this)
-            .addTransportFilter(new GrpcQueryTransportFilter()).build();
-      } catch (Exception e) {
-        throw new RuntimeException("Failed to start secure grpcQueryServer", 
e);
+
+    try {
+      NettyServerBuilder builder = NettyServerBuilder.forPort(port);
+      if (tlsConfig != null) {
+        builder.sslContext(buildGrpcSslContext(tlsConfig));
       }
-    } else {
-      _server = 
ServerBuilder.forPort(port).addService(this).addTransportFilter(new 
GrpcQueryTransportFilter()).build();
+
+      // Add metrics for Netty buffer allocator
+      PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true);
+      PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+      ServerMetrics metrics = ServerMetrics.get();
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_USED_DIRECT_MEMORY,
 metric::usedDirectMemory);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_USED_HEAP_MEMORY, 
metric::usedHeapMemory);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_ARENAS_DIRECT, 
metric::numDirectArenas);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_ARENAS_HEAP, 
metric::numHeapArenas);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_SMALL, 
metric::smallCacheSize);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL, 
metric::normalCacheSize);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_THREADLOCALCACHE, 
metric::numThreadLocalCaches);
+      metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
+
+      _server = builder
+          .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
+          .addService(this)
+          .addTransportFilter(new GrpcQueryTransportFilter())
+          .withOption(ChannelOption.ALLOCATOR, bufAllocator)
+          .withChildOption(ChannelOption.ALLOCATOR, bufAllocator)
+          .build();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start secure grpcQueryServer", e);
     }
+
     _accessControl = accessControl;
     LOGGER.info("Initialized GrpcQueryServer on port: {} with 
numWorkerThreads: {}", port,
         ResourceManager.DEFAULT_QUERY_WORKER_THREADS);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to