This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 43089b15f3 Add netty memory related metrics for GrpcMailboxServer (#15651) 43089b15f3 is described below commit 43089b15f3ce1ceb30ae0469b8cf95b5ea98a50a Author: Songqiao Su <andysongq...@gmail.com> AuthorDate: Sun Apr 27 00:53:00 2025 -0700 Add netty memory related metrics for GrpcMailboxServer (#15651) --- .../apache/pinot/common/metrics/BrokerGauge.java | 12 +++- .../apache/pinot/common/metrics/ServerGauge.java | 12 +++- .../apache/pinot/common/metrics/ServerMetrics.java | 4 +- .../query/mailbox/channel/GrpcMailboxServer.java | 64 ++++++++++++++++------ 4 files changed, 70 insertions(+), 22 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java index 6b353a47fa..26c1c871df 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java @@ -77,7 +77,17 @@ public enum BrokerGauge implements AbstractMetrics.Gauge { /** * The estimated number of query server threads for all currently running multi-stage queries. */ - ESTIMATED_MSE_SERVER_THREADS("number", true); + ESTIMATED_MSE_SERVER_THREADS("number", true), + + // GrpcMailboxServer memory metrics + MAILBOX_SERVER_USED_DIRECT_MEMORY("bytes", true), + MAILBOX_SERVER_USED_HEAP_MEMORY("bytes", true), + MAILBOX_SERVER_ARENAS_DIRECT("arenas", true), + MAILBOX_SERVER_ARENAS_HEAP("arenas", true), + MAILBOX_SERVER_CACHE_SIZE_SMALL("bytes", true), + MAILBOX_SERVER_CACHE_SIZE_NORMAL("bytes", true), + MAILBOX_SERVER_THREADLOCALCACHE("bytes", true), + MAILBOX_SERVER_CHUNK_SIZE("bytes", true); private final String _brokerGaugeName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index 5f894543d8..1f98d3fbc1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -97,7 +97,17 @@ public enum ServerGauge implements AbstractMetrics.Gauge { REALTIME_CONSUMER_DIR_USAGE("bytes", true), SEGMENT_DOWNLOAD_SPEED("bytes", true), PREDOWNLOAD_SPEED("bytes", true), - ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true); + ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true), + + // GrpcMailboxServer memory metrics + MAILBOX_SERVER_USED_DIRECT_MEMORY("bytes", true), + MAILBOX_SERVER_USED_HEAP_MEMORY("bytes", true), + MAILBOX_SERVER_ARENAS_DIRECT("arenas", true), + MAILBOX_SERVER_ARENAS_HEAP("arenas", true), + MAILBOX_SERVER_CACHE_SIZE_SMALL("bytes", true), + MAILBOX_SERVER_CACHE_SIZE_NORMAL("bytes", true), + MAILBOX_SERVER_THREADLOCALCACHE("bytes", true), + MAILBOX_SERVER_CHUNK_SIZE("bytes", true); private final String _gaugeName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java index 7e253d0012..6f91cd2914 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java @@ -55,9 +55,7 @@ public class ServerMetrics extends AbstractMetrics<ServerQueryPhase, ServerMeter * should always call after registration */ public static ServerMetrics get() { - ServerMetrics ret = SERVER_METRICS_INSTANCE.get(); - assert ret != null; - return ret; + return SERVER_METRICS_INSTANCE.get(); } public ServerMetrics(PinotMetricsRegistry metricsRegistry) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java index 61d7f06171..2217c0e648 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java @@ -19,13 +19,19 @@ package org.apache.pinot.query.mailbox.channel; import io.grpc.Server; -import io.grpc.ServerBuilder; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator; +import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric; +import io.grpc.netty.shaded.io.netty.channel.ChannelOption; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.pinot.common.config.TlsConfig; +import org.apache.pinot.common.metrics.BrokerGauge; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.proto.PinotMailboxGrpc; import org.apache.pinot.core.transport.grpc.GrpcQueryServer; @@ -49,24 +55,48 @@ public class GrpcMailboxServer extends PinotMailboxGrpc.PinotMailboxImplBase { public GrpcMailboxServer(MailboxService mailboxService, PinotConfiguration config, @Nullable TlsConfig tlsConfig) { _mailboxService = mailboxService; int port = mailboxService.getPort(); + + PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true); + PooledByteBufAllocatorMetric metric = bufAllocator.metric(); + + // Register memory metrics - use ServerMetrics if available, otherwise use BrokerMetrics + ServerMetrics serverMetrics = ServerMetrics.get(); + BrokerMetrics brokerMetrics = BrokerMetrics.get(); + if (serverMetrics != null) { + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_USED_DIRECT_MEMORY, metric::usedDirectMemory); + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_USED_HEAP_MEMORY, metric::usedHeapMemory); + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_ARENAS_DIRECT, metric::numDirectArenas); + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_ARENAS_HEAP, metric::numHeapArenas); + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_CACHE_SIZE_SMALL, metric::smallCacheSize); + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_CACHE_SIZE_NORMAL, metric::normalCacheSize); + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_THREADLOCALCACHE, metric::numThreadLocalCaches); + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_CHUNK_SIZE, metric::chunkSize); + } else if (brokerMetrics != null) { + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_USED_DIRECT_MEMORY, metric::usedDirectMemory); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_USED_HEAP_MEMORY, metric::usedHeapMemory); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_ARENAS_DIRECT, metric::numDirectArenas); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_ARENAS_HEAP, metric::numHeapArenas); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_CACHE_SIZE_SMALL, metric::smallCacheSize); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_CACHE_SIZE_NORMAL, metric::normalCacheSize); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_THREADLOCALCACHE, metric::numThreadLocalCaches); + brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_CHUNK_SIZE, metric::chunkSize); + } + + NettyServerBuilder builder = NettyServerBuilder + .forPort(port) + .addService(this) + .withOption(ChannelOption.ALLOCATOR, bufAllocator) + .withChildOption(ChannelOption.ALLOCATOR, bufAllocator) + .maxInboundMessageSize(config.getProperty( + CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, + CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)); + + // Add SSL context only if TLS is configured if (tlsConfig != null) { - _server = NettyServerBuilder - .forPort(port) - .addService(this) - .sslContext(GrpcQueryServer.buildGrpcSslContext(tlsConfig)) - .maxInboundMessageSize(config.getProperty( - CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, - CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)) - .build(); - } else { - _server = ServerBuilder - .forPort(port) - .addService(this) - .maxInboundMessageSize(config.getProperty( - CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, - CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)) - .build(); + builder.sslContext(GrpcQueryServer.buildGrpcSslContext(tlsConfig)); } + + _server = builder.build(); } public void start() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org