This is an automated email from the ASF dual-hosted git repository.

mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new efa43007ad Add configs to specify keepAlive and shutdownTimeout for 
GrpcQueryClient. (#13546)
efa43007ad is described below

commit efa43007adc1dd7736580d882f33956e359b0678
Author: Mayank Shrivastava <maya...@apache.org>
AuthorDate: Mon Jul 8 21:27:14 2024 -0700

    Add configs to specify keepAlive and shutdownTimeout for GrpcQueryClient. 
(#13546)
    
    - Added `channelShutdownTimeoutSecond` config for `GrpcQueryClient` with 
default of 10s.
    - Added configs for keep-alive:
      - `channelKeepAliveEnabled` to enable/disable the feature, default false.
      - `channelKeepAliveTimeiSeconds` to configures the interval for sending 
keep-alive pings,
         default 300 seconds (5 minutes).
      - `channelKeepAliveTimeoutSeconds` configures the timeout for waiting for 
a ping acknowledgment,
         default 300 seconds (5 minutes).
      - `channelKeepAliveWithoutCalls` ensures pings are sent even when there 
are no active calls,
         keeping the connection alive during idle period, default true.
---
 .../org/apache/pinot/common/config/GrpcConfig.java | 31 ++++++++++++++++++++++
 .../pinot/common/utils/grpc/GrpcQueryClient.java   | 25 ++++++++++++-----
 2 files changed, 50 insertions(+), 6 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java 
b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
index 68a6b790a3..ed2d32a736 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
@@ -28,6 +28,20 @@ public class GrpcConfig {
   public static final String GRPC_TLS_PREFIX = "tls";
   public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
   public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = 
"maxInboundMessageSizeBytes";
+
+  private static final String CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS = 
"channelShutdownTimeoutSeconds";
+  private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS = 10;
+
+  // KeepAlive configs
+  private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS = 
"channelKeepAliveTimeSeconds";
+  private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS = -1; // 
Set value > 0 to enable keep alive
+
+  private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 
"channelKeepAliveTimeoutSeconds";
+  private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 20; // 
20 seconds
+
+  private static final String CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = 
"channelKeepAliveWithoutCalls";
+  private static final boolean DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = true;
+
   // Default max message size to 128MB
   public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 
* 1024;
   // Default use plain text for transport
@@ -69,6 +83,23 @@ public class GrpcConfig {
     return 
Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT, 
DEFAULT_IS_USE_PLAIN_TEXT));
   }
 
+  public int getChannelShutdownTimeoutSecond() {
+    return _pinotConfig.getProperty(CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS, 
DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS);
+  }
+
+  public int getChannelKeepAliveTimeSeconds() {
+    return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS, 
DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS);
+  }
+
+  public int getChannelKeepAliveTimeoutSeconds() {
+    return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS,
+        DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS);
+  }
+
+  public boolean isChannelKeepAliveWithoutCalls() {
+    return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS, 
DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS);
+  }
+
   public TlsConfig getTlsConfig() {
     return _tlsConfig;
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index a41a30c5d4..9987876b95 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 
 public class GrpcQueryClient implements Closeable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcQueryClient.class);
-  private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10;
+
   // the key is the hashCode of the TlsConfig, the value is the SslContext
   // We don't use TlsConfig as the map key because the TlsConfig is mutable, 
which means the hashCode can change. If the
   // hashCode changes and the map is resized, the SslContext of the old 
hashCode will be lost.
@@ -53,22 +53,35 @@ public class GrpcQueryClient implements Closeable {
 
   private final ManagedChannel _managedChannel;
   private final PinotQueryServerGrpc.PinotQueryServerBlockingStub 
_blockingStub;
+  private final int _channelShutdownTimeoutSeconds;
 
   public GrpcQueryClient(String host, int port) {
     this(host, port, new GrpcConfig(Collections.emptyMap()));
   }
 
   public GrpcQueryClient(String host, int port, GrpcConfig config) {
+    ManagedChannelBuilder<?> channelBuilder;
     if (config.isUsePlainText()) {
-      _managedChannel =
+      channelBuilder =
           ManagedChannelBuilder.forAddress(host, 
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
-              .usePlaintext().build();
+              .usePlaintext();
     } else {
-      _managedChannel =
+      channelBuilder =
           NettyChannelBuilder.forAddress(host, 
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
-              .sslContext(buildSslContext(config.getTlsConfig())).build();
+              .sslContext(buildSslContext(config.getTlsConfig()));
+    }
+
+    // Set keep alive configs, if enabled
+    int channelKeepAliveTimeSeconds = config.getChannelKeepAliveTimeSeconds();
+    if (channelKeepAliveTimeSeconds > 0) {
+      channelBuilder.keepAliveTime(channelKeepAliveTimeSeconds, 
TimeUnit.SECONDS)
+          .keepAliveTimeout(config.getChannelKeepAliveTimeoutSeconds(), 
TimeUnit.SECONDS)
+          .keepAliveWithoutCalls(config.isChannelKeepAliveWithoutCalls());
     }
+
+    _managedChannel = channelBuilder.build();
     _blockingStub = PinotQueryServerGrpc.newBlockingStub(_managedChannel);
+    _channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond();
   }
 
   private SslContext buildSslContext(TlsConfig tlsConfig) {
@@ -103,7 +116,7 @@ public class GrpcQueryClient implements Closeable {
     if (!_managedChannel.isShutdown()) {
       try {
         _managedChannel.shutdownNow();
-        if 
(!_managedChannel.awaitTermination(DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND, 
TimeUnit.SECONDS)) {
+        if (!_managedChannel.awaitTermination(_channelShutdownTimeoutSeconds, 
TimeUnit.SECONDS)) {
           LOGGER.warn("Timed out forcefully shutting down connection: {}. ", 
_managedChannel);
         }
       } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to