This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a1e39b59b65 Fixing grpc maxInboundMessageSize usage in the GRPC
ChannelManager (#16567)
a1e39b59b65 is described below
commit a1e39b59b65370aac016522b3fbc117992b40f03
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Aug 11 01:13:35 2025 -0700
Fixing grpc maxInboundMessageSize usage in the GRPC ChannelManager (#16567)
---
.../java/org/apache/pinot/query/mailbox/MailboxService.java | 11 ++++++-----
.../apache/pinot/query/mailbox/channel/ChannelManager.java | 11 +++++------
2 files changed, 11 insertions(+), 11 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index e8b778d4ee9..a81c1c6f448 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -78,16 +78,17 @@ public class MailboxService {
_port = port;
_config = config;
_tlsConfig = tlsConfig;
- _channelManager = new ChannelManager(tlsConfig);
+ int maxInboundMessageSize = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
+ );
+ _channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize);
boolean splitBlocks = config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT);
if (splitBlocks) {
// so far we ensure payload is not bigger than maxBlockSize/2, we can
fine tune this later
- _maxByteStringSize = Math.max(config.getProperty(
-
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
- ) / 2, 1);
+ _maxByteStringSize = Math.max(maxInboundMessageSize / 2, 1);
} else {
_maxByteStringSize = 0;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index c077db06648..8d63b28aa96 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -26,7 +26,6 @@ import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
-import org.apache.pinot.spi.utils.CommonConstants;
/**
@@ -38,9 +37,11 @@ import org.apache.pinot.spi.utils.CommonConstants;
public class ChannelManager {
private final ConcurrentHashMap<Pair<String, Integer>, ManagedChannel>
_channelMap = new ConcurrentHashMap<>();
private final TlsConfig _tlsConfig;
+ private final int _maxInboundMessageSize;
- public ChannelManager(@Nullable TlsConfig tlsConfig) {
+ public ChannelManager(@Nullable TlsConfig tlsConfig, int
maxInboundMessageSize) {
_tlsConfig = tlsConfig;
+ _maxInboundMessageSize = maxInboundMessageSize;
}
public ManagedChannel getChannel(String hostname, int port) {
@@ -49,8 +50,7 @@ public class ChannelManager {
return _channelMap.computeIfAbsent(Pair.of(hostname, port),
(k) -> NettyChannelBuilder
.forAddress(k.getLeft(), k.getRight())
- .maxInboundMessageSize(
-
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
+ .maxInboundMessageSize(_maxInboundMessageSize)
.sslContext(ServerGrpcQueryClient.buildSslContext(_tlsConfig))
.build()
);
@@ -58,8 +58,7 @@ public class ChannelManager {
return _channelMap.computeIfAbsent(Pair.of(hostname, port),
(k) -> ManagedChannelBuilder
.forAddress(k.getLeft(), k.getRight())
- .maxInboundMessageSize(
-
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)
+ .maxInboundMessageSize(_maxInboundMessageSize)
.usePlaintext()
.build());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]