This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4f018c2712 throttling logic (#15686) 4f018c2712 is described below commit 4f018c271250efc02cfea31968f9599499cbf8e5 Author: Songqiao Su <andysongq...@gmail.com> AuthorDate: Thu May 1 09:42:43 2025 -0700 throttling logic (#15686) --- .../org/apache/pinot/common/config/GrpcConfig.java | 13 +++++++++ .../apache/pinot/common/metrics/ServerMeter.java | 2 ++ .../pinot/core/transport/grpc/GrpcQueryServer.java | 31 +++++++++++++++++++--- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java index ed2d32a736..16ba0323d3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java @@ -49,6 +49,11 @@ public class GrpcConfig { public static final String CONFIG_QUERY_WORKER_THREADS = "queryWorkerThreads"; + // memory usage threshold that triggers request throttling + public static final String REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES = "requestThrottlingMemoryThresholdBytes"; + // Default threshold in bytes (16GB) + public static final long DEFAULT_REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES = 16 * 1024 * 1024 * 1024L; + private final TlsConfig _tlsConfig; private final PinotConfiguration _pinotConfig; @@ -112,6 +117,14 @@ public class GrpcConfig { return _pinotConfig.getProperty(CONFIG_QUERY_WORKER_THREADS, Integer.class); } + public long getRequestThrottlingMemoryThresholdBytes() { + return _pinotConfig.getProperty(REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES, Long.class); + } + + public boolean isRequestThrottlingMemroyThresholdSet() { + return _pinotConfig.containsKey(REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES); + } + public boolean isQueryWorkerThreadsSet() { return _pinotConfig.containsKey(CONFIG_QUERY_WORKER_THREADS); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 61270c4060..0e960a1038 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -133,6 +133,8 @@ public enum ServerMeter implements AbstractMetrics.Meter { TOTAL_THREAD_CPU_TIME_MILLIS("millis", false), LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false), + GRPC_MEMORY_REJECTIONS("rejections", true, "Number of grpc requests rejected due to memory pressure"), + DIRECT_MEMORY_OOM("directMemoryOOMCount", true), // Multi-stage diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java index 628d7a283a..e13146d833 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java @@ -82,6 +82,9 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa private final ExecutorService _executorService; private final AccessControl _accessControl; private final ServerQueryLogger _queryLogger = ServerQueryLogger.getInstance(); + // Memory allocator and throttling configuration + private final PooledByteBufAllocator _bufAllocator; + private final long _memoryThresholdBytes; // Filter to keep track of gRPC connections. private class GrpcQueryTransportFilter extends ServerTransportFilter { @@ -114,6 +117,14 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa _queryExecutor = queryExecutor; _serverMetrics = serverMetrics; + _bufAllocator = new PooledByteBufAllocator(true); + + if (config.isRequestThrottlingMemroyThresholdSet()) { + _memoryThresholdBytes = config.getRequestThrottlingMemoryThresholdBytes(); + } else { + _memoryThresholdBytes = GrpcConfig.DEFAULT_REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES; + } + try { NettyServerBuilder builder = NettyServerBuilder.forPort(port); if (tlsConfig != null) { @@ -121,8 +132,7 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa } // Add metrics for Netty buffer allocator - PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true); - PooledByteBufAllocatorMetric metric = bufAllocator.metric(); + PooledByteBufAllocatorMetric metric = _bufAllocator.metric(); ServerMetrics metrics = ServerMetrics.get(); metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_USED_DIRECT_MEMORY, metric::usedDirectMemory); metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_USED_HEAP_MEMORY, metric::usedHeapMemory); @@ -137,8 +147,8 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) .addService(this) .addTransportFilter(new GrpcQueryTransportFilter()) - .withOption(ChannelOption.ALLOCATOR, bufAllocator) - .withChildOption(ChannelOption.ALLOCATOR, bufAllocator) + .withOption(ChannelOption.ALLOCATOR, _bufAllocator) + .withChildOption(ChannelOption.ALLOCATOR, _bufAllocator) .build(); } catch (Exception e) { throw new RuntimeException("Failed to start secure grpcQueryServer", e); @@ -197,6 +207,19 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa _serverMetrics.addMeteredGlobalValue(ServerMeter.GRPC_QUERIES, 1); _serverMetrics.addMeteredGlobalValue(ServerMeter.GRPC_BYTES_RECEIVED, request.getSerializedSize()); + // Check memory usage before processing the request + long usedDirectMemory = _bufAllocator.metric().usedDirectMemory(); + if (usedDirectMemory > _memoryThresholdBytes) { + LOGGER.warn("Request rejected due to memory pressure. Used direct memory: {} bytes, threshold: {} bytes", + usedDirectMemory, _memoryThresholdBytes); + _serverMetrics.addMeteredGlobalValue(ServerMeter.GRPC_MEMORY_REJECTIONS, 1); + responseObserver.onError(Status.RESOURCE_EXHAUSTED + .withDescription(String.format("Server under memory pressure (used: %d bytes, threshold: %d bytes)", + usedDirectMemory, _memoryThresholdBytes)) + .asException()); + return; + } + try (QueryThreadContext.CloseableContext closeme = QueryThreadContext.open()) { QueryThreadContext.setQueryEngine("sse-grpc"); // Deserialize the request --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org