This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f018c2712 throttling logic (#15686)
4f018c2712 is described below

commit 4f018c271250efc02cfea31968f9599499cbf8e5
Author: Songqiao Su <andysongq...@gmail.com>
AuthorDate: Thu May 1 09:42:43 2025 -0700

    throttling logic (#15686)
---
 .../org/apache/pinot/common/config/GrpcConfig.java | 13 +++++++++
 .../apache/pinot/common/metrics/ServerMeter.java   |  2 ++
 .../pinot/core/transport/grpc/GrpcQueryServer.java | 31 +++++++++++++++++++---
 3 files changed, 42 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java 
b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
index ed2d32a736..16ba0323d3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
@@ -49,6 +49,11 @@ public class GrpcConfig {
 
   public static final String CONFIG_QUERY_WORKER_THREADS = 
"queryWorkerThreads";
 
+  // memory usage threshold that triggers request throttling
+  public static final String REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES = 
"requestThrottlingMemoryThresholdBytes";
+  // Default threshold in bytes (16GB)
+  public static final long DEFAULT_REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES = 
16 * 1024 * 1024 * 1024L;
+
   private final TlsConfig _tlsConfig;
   private final PinotConfiguration _pinotConfig;
 
@@ -112,6 +117,14 @@ public class GrpcConfig {
     return _pinotConfig.getProperty(CONFIG_QUERY_WORKER_THREADS, 
Integer.class);
   }
 
+  public long getRequestThrottlingMemoryThresholdBytes() {
+    return _pinotConfig.getProperty(REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES, 
Long.class);
+  }
+
+  public boolean isRequestThrottlingMemroyThresholdSet() {
+    return _pinotConfig.containsKey(REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES);
+  }
+
   public boolean isQueryWorkerThreadsSet() {
     return _pinotConfig.containsKey(CONFIG_QUERY_WORKER_THREADS);
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 61270c4060..0e960a1038 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -133,6 +133,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
   LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false),
 
+  GRPC_MEMORY_REJECTIONS("rejections", true, "Number of grpc requests rejected 
due to memory pressure"),
+
   DIRECT_MEMORY_OOM("directMemoryOOMCount", true),
 
   // Multi-stage
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 628d7a283a..e13146d833 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -82,6 +82,9 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
   private final ExecutorService _executorService;
   private final AccessControl _accessControl;
   private final ServerQueryLogger _queryLogger = 
ServerQueryLogger.getInstance();
+  // Memory allocator and throttling configuration
+  private final PooledByteBufAllocator _bufAllocator;
+  private final long _memoryThresholdBytes;
 
   // Filter to keep track of gRPC connections.
   private class GrpcQueryTransportFilter extends ServerTransportFilter {
@@ -114,6 +117,14 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
     _queryExecutor = queryExecutor;
     _serverMetrics = serverMetrics;
 
+    _bufAllocator = new PooledByteBufAllocator(true);
+
+    if (config.isRequestThrottlingMemroyThresholdSet()) {
+      _memoryThresholdBytes = 
config.getRequestThrottlingMemoryThresholdBytes();
+    } else {
+      _memoryThresholdBytes = 
GrpcConfig.DEFAULT_REQUEST_THROTTLING_MEMORY_THRESHOLD_BYTES;
+    }
+
     try {
       NettyServerBuilder builder = NettyServerBuilder.forPort(port);
       if (tlsConfig != null) {
@@ -121,8 +132,7 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
       }
 
       // Add metrics for Netty buffer allocator
-      PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true);
-      PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+      PooledByteBufAllocatorMetric metric = _bufAllocator.metric();
       ServerMetrics metrics = ServerMetrics.get();
       
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_USED_DIRECT_MEMORY,
 metric::usedDirectMemory);
       
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_USED_HEAP_MEMORY, 
metric::usedHeapMemory);
@@ -137,8 +147,8 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
           .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
           .addService(this)
           .addTransportFilter(new GrpcQueryTransportFilter())
-          .withOption(ChannelOption.ALLOCATOR, bufAllocator)
-          .withChildOption(ChannelOption.ALLOCATOR, bufAllocator)
+          .withOption(ChannelOption.ALLOCATOR, _bufAllocator)
+          .withChildOption(ChannelOption.ALLOCATOR, _bufAllocator)
           .build();
     } catch (Exception e) {
       throw new RuntimeException("Failed to start secure grpcQueryServer", e);
@@ -197,6 +207,19 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
     _serverMetrics.addMeteredGlobalValue(ServerMeter.GRPC_QUERIES, 1);
     _serverMetrics.addMeteredGlobalValue(ServerMeter.GRPC_BYTES_RECEIVED, 
request.getSerializedSize());
 
+    // Check memory usage before processing the request
+    long usedDirectMemory = _bufAllocator.metric().usedDirectMemory();
+    if (usedDirectMemory > _memoryThresholdBytes) {
+      LOGGER.warn("Request rejected due to memory pressure. Used direct 
memory: {} bytes, threshold: {} bytes",
+          usedDirectMemory, _memoryThresholdBytes);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.GRPC_MEMORY_REJECTIONS, 
1);
+      responseObserver.onError(Status.RESOURCE_EXHAUSTED
+          .withDescription(String.format("Server under memory pressure (used: 
%d bytes, threshold: %d bytes)",
+              usedDirectMemory, _memoryThresholdBytes))
+          .asException());
+      return;
+    }
+
     try (QueryThreadContext.CloseableContext closeme = 
QueryThreadContext.open()) {
       QueryThreadContext.setQueryEngine("sse-grpc");
       // Deserialize the request


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to