This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 54ee6360ca2 Disable idle mailboxes by default (#16691)
54ee6360ca2 is described below

commit 54ee6360ca29c929524d768abd7cfe21e3c63a21
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Aug 26 13:41:42 2025 +0200

    Disable idle mailboxes by default (#16691)
---
 .../org/apache/pinot/common/config/GrpcConfig.java | 13 +++++-
 .../test/resources/{log4j2.xml => log4j2-test.xml} | 13 ++++++
 .../apache/pinot/query/mailbox/MailboxService.java | 14 ++++++-
 .../query/mailbox/channel/ChannelManager.java      | 46 ++++++++++++++++------
 .../apache/pinot/spi/utils/CommonConstants.java    | 16 ++++++++
 5 files changed, 89 insertions(+), 13 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java 
b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
index 16ba0323d3f..c3276444f64 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
@@ -23,7 +23,18 @@ import java.util.Map;
 import org.apache.pinot.common.utils.tls.TlsUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
-
+/// Configs used for the gRPC **query** service.
+///
+/// Remember that in Pinot we use different gPRC services for different 
purposes:
+/// - **query**: used by Pinot users for executing queries
+/// - **internal**: used by Pinot for internal communication between 
servers/broker in MSE. This includes:
+///    - The ability to send query plans from broker to server, and the 
mailbox service for sending data between
+///      servers/brokers
+///    - The broker -> server communication for the SSE when 
`pinot.broker.request.handler.type` is set to grpc
+///      (see the GrpcBrokerRequestHandler)
+///
+/// This class only affects the **query** service. See ChannelManager, 
MailboxService and GrpcMailboxServer to learn
+/// more about the Grpc config used for MSE.
 public class GrpcConfig {
   public static final String GRPC_TLS_PREFIX = "tls";
   public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
diff --git a/pinot-integration-tests/src/test/resources/log4j2.xml 
b/pinot-integration-tests/src/test/resources/log4j2-test.xml
similarity index 73%
rename from pinot-integration-tests/src/test/resources/log4j2.xml
rename to pinot-integration-tests/src/test/resources/log4j2-test.xml
index 439331f9d7b..afed36b3132 100644
--- a/pinot-integration-tests/src/test/resources/log4j2.xml
+++ b/pinot-integration-tests/src/test/resources/log4j2-test.xml
@@ -23,12 +23,25 @@
   <Appenders>
     <Console name="console" target="SYSTEM_OUT">
       <PatternLayout pattern="%d{HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n"/>
+      <Filters>
+        <BurstFilter level="ERROR" rate="5" maxBurst="10"/>
+      </Filters>
+    </Console>
+    <Console name="spammy" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n"/>
+      <Filters>
+        <BurstFilter level="ERROR" rate="1" maxBurst="2"/>
+      </Filters>
     </Console>
   </Appenders>
+
   <Loggers>
     <Logger name="org.apache.pinot" level="warn" additivity="false">
       <AppenderRef ref="console"/>
     </Logger>
+    <Logger name="org.apache.pinot.core.accounting" level="warn" 
additivity="false">
+      <AppenderRef ref="spammy"/>
+    </Logger>
     <Root level="error">
       <AppenderRef ref="console"/>
     </Root>
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 8e572afe478..92dae478321 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.mailbox;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
+import java.time.Duration;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
@@ -89,7 +90,7 @@ public class MailboxService {
         
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
         
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
     );
-    _channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize);
+    _channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize, 
getIdleTimeout(config));
     _accessControlFactory = accessControlFactory;
     boolean splitBlocks = config.getProperty(
         
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
@@ -168,4 +169,15 @@ public class MailboxService {
   public void releaseReceivingMailbox(ReceivingMailbox mailbox) {
     _receivingMailboxCache.invalidate(mailbox.getId());
   }
+
+  private static Duration getIdleTimeout(PinotConfiguration config) {
+    long channelIdleTimeoutSeconds = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_CHANNEL_IDLE_TIMEOUT_SECONDS,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS);
+    if (channelIdleTimeoutSeconds > 0) {
+      return Duration.ofSeconds(channelIdleTimeoutSeconds);
+    }
+    // Use a reasonable maximum idle timeout (1 year) to avoid overflow.
+    return Duration.ofDays(365);
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index 8d63b28aa96..f0ff135d9df 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -21,7 +21,9 @@ package org.apache.pinot.query.mailbox.channel;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import java.time.Duration;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.config.TlsConfig;
@@ -35,32 +37,54 @@ import 
org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
  * query/job/stages.
  */
 public class ChannelManager {
+  /**
+   * Map from (hostname, port) to the ManagedChannel with all known channels
+   */
   private final ConcurrentHashMap<Pair<String, Integer>, ManagedChannel> 
_channelMap = new ConcurrentHashMap<>();
   private final TlsConfig _tlsConfig;
+  /**
+   * The idle timeout for the channel, which cannot be disabled in gRPC.
+   *
+   * In general we want to prevent the channel from going idle, so that we 
don't have to re-establish the connection
+   * (including TLS negotiation) before sending any message, which increases 
the latency of the first query sent after a
+   * period of inactivity. In order to achieve that, we set the idle timeout 
to a very large value by default.
+   */
+  private final Duration _idleTimeout;
   private final int _maxInboundMessageSize;
 
-  public ChannelManager(@Nullable TlsConfig tlsConfig, int 
maxInboundMessageSize) {
+  public ChannelManager(@Nullable TlsConfig tlsConfig, int 
maxInboundMessageSize, Duration idleTimeout) {
     _tlsConfig = tlsConfig;
     _maxInboundMessageSize = maxInboundMessageSize;
+    _idleTimeout = idleTimeout;
   }
 
   public ManagedChannel getChannel(String hostname, int port) {
     // TODO: Revisit parameters
     if (_tlsConfig != null) {
       return _channelMap.computeIfAbsent(Pair.of(hostname, port),
-          (k) -> NettyChannelBuilder
-              .forAddress(k.getLeft(), k.getRight())
-              .maxInboundMessageSize(_maxInboundMessageSize)
-              .sslContext(ServerGrpcQueryClient.buildSslContext(_tlsConfig))
-              .build()
+          (k) -> {
+            NettyChannelBuilder channelBuilder = NettyChannelBuilder
+                .forAddress(k.getLeft(), k.getRight())
+                .maxInboundMessageSize(
+                    _maxInboundMessageSize)
+                .sslContext(ServerGrpcQueryClient.buildSslContext(_tlsConfig));
+            return decorate(channelBuilder).build();
+          }
       );
     } else {
       return _channelMap.computeIfAbsent(Pair.of(hostname, port),
-          (k) -> ManagedChannelBuilder
-              .forAddress(k.getLeft(), k.getRight())
-              .maxInboundMessageSize(_maxInboundMessageSize)
-              .usePlaintext()
-              .build());
+          (k) -> {
+            ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
+                .forAddress(k.getLeft(), k.getRight())
+                .maxInboundMessageSize(
+                    _maxInboundMessageSize)
+                .usePlaintext();
+            return decorate(channelBuilder).build();
+          });
     }
   }
+
+  private ManagedChannelBuilder<?> decorate(ManagedChannelBuilder<?> builder) {
+    return builder.idleTimeout(_idleTimeout.getSeconds(), TimeUnit.SECONDS);
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1b8fbc16a96..4d2a61c965a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1903,6 +1903,22 @@ public class CommonConstants {
     public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES 
= "pinot.query.runner.max.msg.size.bytes";
     public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = 
16 * 1024 * 1024;
 
+
+    /**
+     * Configuration for channel idle timeout in seconds.
+     *
+     * gRPC channels go idle after a period of inactivity. When a channel is 
idle, its resources are released. The next
+     * query using the channel will need to re-establish the connection. This 
includes the TLS negotiation and therefore
+     * can increase the latency of the query by some milliseconds.
+     *
+     * In normal Pinot clusters that are continuously serving queries, 
channels should never go idle.
+     * But it could affect clusters that are not continuously serving queries.
+     * This is why by default the channel idle timeout is set to -1, which 
means that the channel idle timeout is
+     * disabled.
+     */
+    public static final String KEY_OF_CHANNEL_IDLE_TIMEOUT_SECONDS = 
"pinot.query.runner.channel.idle.timeout.seconds";
+    public static final long DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS = -1;
+
     /**
      * Enable splitting of data block payload during mailbox transfer.
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to