This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fd802fcf9a Add netty memory related metrics for grpc query/broker servers (#15625) fd802fcf9a is described below commit fd802fcf9a5fd6cc3d70b3e853ae3978d9957167 Author: Songqiao Su <andysongq...@gmail.com> AuthorDate: Sun Apr 27 05:53:57 2025 -0700 Add netty memory related metrics for grpc query/broker servers (#15625) * add metrics for grpc server * fmt * apply to broker grpc server as well * add .withChildOption * use own allocator instance * Update BrokerGauge.java * Update ServerGauge.java --------- Co-authored-by: Xiang Fu <xiangfu.1...@gmail.com> --- .../apache/pinot/broker/grpc/BrokerGrpcServer.java | 76 +++++++++++++++++----- .../apache/pinot/common/metrics/BrokerGauge.java | 10 +++ .../apache/pinot/common/metrics/ServerGauge.java | 10 +++ .../pinot/core/transport/grpc/GrpcQueryServer.java | 44 ++++++++++--- 4 files changed, 112 insertions(+), 28 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java index cc8ccb9b5f..706505a0b0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java @@ -24,11 +24,13 @@ import com.google.protobuf.ByteString; import io.grpc.Attributes; import io.grpc.Grpc; import io.grpc.Server; -import io.grpc.ServerBuilder; import io.grpc.ServerTransportFilter; import io.grpc.Status; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator; +import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric; +import io.grpc.netty.shaded.io.netty.channel.ChannelOption; import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; @@ -44,6 +46,7 @@ import org.apache.pinot.common.compression.CompressionFactory; import org.apache.pinot.common.compression.Compressor; import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.config.TlsConfig; +import org.apache.pinot.common.metrics.BrokerGauge; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.proto.Broker; @@ -106,28 +109,65 @@ public class BrokerGrpcServer extends PinotQueryBrokerGrpc.PinotQueryBrokerImplB _queryClientConfig = createQueryClientConfig(brokerConf); LOGGER.info("gRPC query client config: usePlainText {}", _queryClientConfig.isUsePlainText()); _secureGrpcPort = brokerConf.getProperty(CommonConstants.Broker.Grpc.KEY_OF_GRPC_TLS_PORT, -1); + _brokerId = brokerId; + _brokerRequestHandler = brokerRequestHandler; + + // Determine which port to use + int portToUse; + boolean isSecure = false; + if (_secureGrpcPort > 0) { - try { - TlsConfig tlsConfig = TlsUtils.extractTlsConfig(brokerConf, CommonConstants.Broker.BROKER_TLS_PREFIX); - LOGGER.info("Creating Secure gRPC Server in port {}", _secureGrpcPort); - _server = - NettyServerBuilder.forPort(_secureGrpcPort).sslContext(buildGRpcSslContext(tlsConfig)).addService(this) - .addTransportFilter(new BrokerGrpcTransportFilter()).build(); - } catch (Exception e) { - throw new RuntimeException("Failed to start secure grpcQueryServer", e); - } + portToUse = _secureGrpcPort; + isSecure = true; + LOGGER.info("Creating Secure gRPC Server on port {}", portToUse); } else if (_grpcPort > 0) { - LOGGER.info("Creating plain text gRPC Server in port {}", _grpcPort); - _server = - ServerBuilder.forPort(_grpcPort).addService(this).addTransportFilter(new BrokerGrpcTransportFilter()).build(); + portToUse = _grpcPort; + LOGGER.info("Creating plain text gRPC Server on port {}", portToUse); } else { - LOGGER.info("Not creating gRPC Server due to the grpc port is {} and secureGrpcPort is {}", _grpcPort, - _secureGrpcPort); + LOGGER.info("Not creating gRPC Server due to the grpc port is {} and secureGrpcPort is {}", + _grpcPort, _secureGrpcPort); _server = null; + return; } - _brokerId = brokerId; - _brokerRequestHandler = brokerRequestHandler; - LOGGER.info("Initialized BrokerGrpcServer on port: {}", _grpcPort); + + try { + // Create buffer allocator and register metrics + PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true); + registerBufferMetrics(bufAllocator, brokerMetrics); + + // Build the server with common configuration + NettyServerBuilder builder = NettyServerBuilder.forPort(portToUse) + .addService(this) + .addTransportFilter(new BrokerGrpcTransportFilter()) + .withChildOption(ChannelOption.ALLOCATOR, bufAllocator) + .withOption(ChannelOption.ALLOCATOR, bufAllocator); + + // Add SSL context only for secure connection + if (isSecure) { + TlsConfig tlsConfig = TlsUtils.extractTlsConfig(brokerConf, CommonConstants.Broker.BROKER_TLS_PREFIX); + builder.sslContext(buildGRpcSslContext(tlsConfig)); + } + _server = builder.build(); + } catch (Exception e) { + throw new RuntimeException("Failed to start gRPC server", e); + } + + LOGGER.info("Initialized BrokerGrpcServer on port: {}", portToUse); + } + + /** + * Registers buffer metrics for the given allocator. + */ + private void registerBufferMetrics(PooledByteBufAllocator bufAllocator, BrokerMetrics brokerMetrics) { + PooledByteBufAllocatorMetric metric = bufAllocator.metric(); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_USED_DIRECT_MEMORY, metric::usedDirectMemory); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_USED_HEAP_MEMORY, metric::usedHeapMemory); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_ARENAS_DIRECT, metric::numDirectArenas); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_ARENAS_HEAP, metric::numHeapArenas); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_SMALL, metric::smallCacheSize); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL, metric::normalCacheSize); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_THREADLOCALCACHE, metric::numThreadLocalCaches); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_CHUNK_SIZE, metric::chunkSize); } public void start() { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java index 26c1c871df..c2e602e03e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java @@ -79,6 +79,16 @@ public enum BrokerGauge implements AbstractMetrics.Gauge { */ ESTIMATED_MSE_SERVER_THREADS("number", true), + // gRPC Netty buffer metrics + GRPC_NETTY_POOLED_USED_DIRECT_MEMORY("bytes", true), + GRPC_NETTY_POOLED_USED_HEAP_MEMORY("bytes", true), + GRPC_NETTY_POOLED_ARENAS_DIRECT("arenas", true), + GRPC_NETTY_POOLED_ARENAS_HEAP("arenas", true), + GRPC_NETTY_POOLED_CACHE_SIZE_SMALL("bytes", true), + GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL("bytes", true), + GRPC_NETTY_POOLED_THREADLOCALCACHE("bytes", true), + GRPC_NETTY_POOLED_CHUNK_SIZE("bytes", true), + // GrpcMailboxServer memory metrics MAILBOX_SERVER_USED_DIRECT_MEMORY("bytes", true), MAILBOX_SERVER_USED_HEAP_MEMORY("bytes", true), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index 1f98d3fbc1..efc98965b7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -99,6 +99,16 @@ public enum ServerGauge implements AbstractMetrics.Gauge { PREDOWNLOAD_SPEED("bytes", true), ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true), + // gRPC Netty buffer metrics + GRPC_NETTY_POOLED_USED_DIRECT_MEMORY("bytes", true), + GRPC_NETTY_POOLED_USED_HEAP_MEMORY("bytes", true), + GRPC_NETTY_POOLED_ARENAS_DIRECT("arenas", true), + GRPC_NETTY_POOLED_ARENAS_HEAP("arenas", true), + GRPC_NETTY_POOLED_CACHE_SIZE_SMALL("bytes", true), + GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL("bytes", true), + GRPC_NETTY_POOLED_THREADLOCALCACHE("bytes", true), + GRPC_NETTY_POOLED_CHUNK_SIZE("bytes", true), + // GrpcMailboxServer memory metrics MAILBOX_SERVER_USED_DIRECT_MEMORY("bytes", true), MAILBOX_SERVER_USED_HEAP_MEMORY("bytes", true), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java index 72a116c679..3ece458708 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java @@ -21,11 +21,13 @@ package org.apache.pinot.core.transport.grpc; import io.grpc.Attributes; import io.grpc.Grpc; import io.grpc.Server; -import io.grpc.ServerBuilder; import io.grpc.ServerTransportFilter; import io.grpc.Status; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator; +import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric; +import io.grpc.netty.shaded.io.netty.channel.ChannelOption; import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; @@ -41,6 +43,7 @@ import nl.altindag.ssl.SSLFactory; import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerTimer; @@ -62,6 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + // TODO: Plug in QueryScheduler public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBase { private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class); @@ -107,17 +111,37 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa ); _queryExecutor = queryExecutor; _serverMetrics = serverMetrics; - if (tlsConfig != null) { - try { - _server = NettyServerBuilder.forPort(port).sslContext(buildGrpcSslContext(tlsConfig)) - .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()).addService(this) - .addTransportFilter(new GrpcQueryTransportFilter()).build(); - } catch (Exception e) { - throw new RuntimeException("Failed to start secure grpcQueryServer", e); + + try { + NettyServerBuilder builder = NettyServerBuilder.forPort(port); + if (tlsConfig != null) { + builder.sslContext(buildGrpcSslContext(tlsConfig)); } - } else { - _server = ServerBuilder.forPort(port).addService(this).addTransportFilter(new GrpcQueryTransportFilter()).build(); + + // Add metrics for Netty buffer allocator + PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true); + PooledByteBufAllocatorMetric metric = bufAllocator.metric(); + ServerMetrics metrics = ServerMetrics.get(); + metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_USED_DIRECT_MEMORY, metric::usedDirectMemory); + metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_USED_HEAP_MEMORY, metric::usedHeapMemory); + metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_ARENAS_DIRECT, metric::numDirectArenas); + metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_ARENAS_HEAP, metric::numHeapArenas); + metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_SMALL, metric::smallCacheSize); + metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL, metric::normalCacheSize); + metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_THREADLOCALCACHE, metric::numThreadLocalCaches); + metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_CHUNK_SIZE, metric::chunkSize); + + _server = builder + .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) + .addService(this) + .addTransportFilter(new GrpcQueryTransportFilter()) + .withOption(ChannelOption.ALLOCATOR, bufAllocator) + .withChildOption(ChannelOption.ALLOCATOR, bufAllocator) + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to start secure grpcQueryServer", e); } + _accessControl = accessControl; LOGGER.info("Initialized GrpcQueryServer on port: {} with numWorkerThreads: {}", port, ResourceManager.DEFAULT_QUERY_WORKER_THREADS); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org