This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a1e39b59b65 Fixing grpc maxInboundMessageSize usage in the GRPC 
ChannelManager (#16567)
a1e39b59b65 is described below

commit a1e39b59b65370aac016522b3fbc117992b40f03
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Aug 11 01:13:35 2025 -0700

    Fixing grpc maxInboundMessageSize usage in the GRPC ChannelManager (#16567)
---
 .../java/org/apache/pinot/query/mailbox/MailboxService.java   | 11 ++++++-----
 .../apache/pinot/query/mailbox/channel/ChannelManager.java    | 11 +++++------
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index e8b778d4ee9..a81c1c6f448 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -78,16 +78,17 @@ public class MailboxService {
     _port = port;
     _config = config;
     _tlsConfig = tlsConfig;
-    _channelManager = new ChannelManager(tlsConfig);
+    int maxInboundMessageSize = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
+    );
+    _channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize);
     boolean splitBlocks = config.getProperty(
         
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
         
CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT);
     if (splitBlocks) {
       // so far we ensure payload is not bigger than maxBlockSize/2, we can 
fine tune this later
-      _maxByteStringSize = Math.max(config.getProperty(
-          
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-          
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
-      ) / 2, 1);
+      _maxByteStringSize = Math.max(maxInboundMessageSize / 2, 1);
     } else {
       _maxByteStringSize = 0;
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index c077db06648..8d63b28aa96 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -26,7 +26,6 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
-import org.apache.pinot.spi.utils.CommonConstants;
 
 
 /**
@@ -38,9 +37,11 @@ import org.apache.pinot.spi.utils.CommonConstants;
 public class ChannelManager {
   private final ConcurrentHashMap<Pair<String, Integer>, ManagedChannel> 
_channelMap = new ConcurrentHashMap<>();
   private final TlsConfig _tlsConfig;
+  private final int _maxInboundMessageSize;
 
-  public ChannelManager(@Nullable TlsConfig tlsConfig) {
+  public ChannelManager(@Nullable TlsConfig tlsConfig, int 
maxInboundMessageSize) {
     _tlsConfig = tlsConfig;
+    _maxInboundMessageSize = maxInboundMessageSize;
   }
 
   public ManagedChannel getChannel(String hostname, int port) {
@@ -49,8 +50,7 @@ public class ChannelManager {
       return _channelMap.computeIfAbsent(Pair.of(hostname, port),
           (k) -> NettyChannelBuilder
               .forAddress(k.getLeft(), k.getRight())
-              .maxInboundMessageSize(
-                  
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
+              .maxInboundMessageSize(_maxInboundMessageSize)
               .sslContext(ServerGrpcQueryClient.buildSslContext(_tlsConfig))
               .build()
       );
@@ -58,8 +58,7 @@ public class ChannelManager {
       return _channelMap.computeIfAbsent(Pair.of(hostname, port),
           (k) -> ManagedChannelBuilder
               .forAddress(k.getLeft(), k.getRight())
-              .maxInboundMessageSize(
-                  
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
+              .maxInboundMessageSize(_maxInboundMessageSize)
               .usePlaintext()
               .build());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to