This is an automated email from the ASF dual-hosted git repository. mayanks pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new efa43007ad Add configs to specify keepAlive and shutdownTimeout for GrpcQueryClient. (#13546) efa43007ad is described below commit efa43007adc1dd7736580d882f33956e359b0678 Author: Mayank Shrivastava <maya...@apache.org> AuthorDate: Mon Jul 8 21:27:14 2024 -0700 Add configs to specify keepAlive and shutdownTimeout for GrpcQueryClient. (#13546) - Added `channelShutdownTimeoutSecond` config for `GrpcQueryClient` with default of 10s. - Added configs for keep-alive: - `channelKeepAliveEnabled` to enable/disable the feature, default false. - `channelKeepAliveTimeiSeconds` to configures the interval for sending keep-alive pings, default 300 seconds (5 minutes). - `channelKeepAliveTimeoutSeconds` configures the timeout for waiting for a ping acknowledgment, default 300 seconds (5 minutes). - `channelKeepAliveWithoutCalls` ensures pings are sent even when there are no active calls, keeping the connection alive during idle period, default true. --- .../org/apache/pinot/common/config/GrpcConfig.java | 31 ++++++++++++++++++++++ .../pinot/common/utils/grpc/GrpcQueryClient.java | 25 ++++++++++++----- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java index 68a6b790a3..ed2d32a736 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java @@ -28,6 +28,20 @@ public class GrpcConfig { public static final String GRPC_TLS_PREFIX = "tls"; public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText"; public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = "maxInboundMessageSizeBytes"; + + private static final String CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS = "channelShutdownTimeoutSeconds"; + private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS = 10; + + // KeepAlive configs + private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS = "channelKeepAliveTimeSeconds"; + private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS = -1; // Set value > 0 to enable keep alive + + private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = "channelKeepAliveTimeoutSeconds"; + private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 20; // 20 seconds + + private static final String CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = "channelKeepAliveWithoutCalls"; + private static final boolean DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = true; + // Default max message size to 128MB public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 * 1024; // Default use plain text for transport @@ -69,6 +83,23 @@ public class GrpcConfig { return Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT, DEFAULT_IS_USE_PLAIN_TEXT)); } + public int getChannelShutdownTimeoutSecond() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS, DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS); + } + + public int getChannelKeepAliveTimeSeconds() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS, DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS); + } + + public int getChannelKeepAliveTimeoutSeconds() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS, + DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS); + } + + public boolean isChannelKeepAliveWithoutCalls() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS, DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS); + } + public TlsConfig getTlsConfig() { return _tlsConfig; } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java index a41a30c5d4..9987876b95 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; public class GrpcQueryClient implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryClient.class); - private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10; + // the key is the hashCode of the TlsConfig, the value is the SslContext // We don't use TlsConfig as the map key because the TlsConfig is mutable, which means the hashCode can change. If the // hashCode changes and the map is resized, the SslContext of the old hashCode will be lost. @@ -53,22 +53,35 @@ public class GrpcQueryClient implements Closeable { private final ManagedChannel _managedChannel; private final PinotQueryServerGrpc.PinotQueryServerBlockingStub _blockingStub; + private final int _channelShutdownTimeoutSeconds; public GrpcQueryClient(String host, int port) { this(host, port, new GrpcConfig(Collections.emptyMap())); } public GrpcQueryClient(String host, int port, GrpcConfig config) { + ManagedChannelBuilder<?> channelBuilder; if (config.isUsePlainText()) { - _managedChannel = + channelBuilder = ManagedChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) - .usePlaintext().build(); + .usePlaintext(); } else { - _managedChannel = + channelBuilder = NettyChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) - .sslContext(buildSslContext(config.getTlsConfig())).build(); + .sslContext(buildSslContext(config.getTlsConfig())); + } + + // Set keep alive configs, if enabled + int channelKeepAliveTimeSeconds = config.getChannelKeepAliveTimeSeconds(); + if (channelKeepAliveTimeSeconds > 0) { + channelBuilder.keepAliveTime(channelKeepAliveTimeSeconds, TimeUnit.SECONDS) + .keepAliveTimeout(config.getChannelKeepAliveTimeoutSeconds(), TimeUnit.SECONDS) + .keepAliveWithoutCalls(config.isChannelKeepAliveWithoutCalls()); } + + _managedChannel = channelBuilder.build(); _blockingStub = PinotQueryServerGrpc.newBlockingStub(_managedChannel); + _channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond(); } private SslContext buildSslContext(TlsConfig tlsConfig) { @@ -103,7 +116,7 @@ public class GrpcQueryClient implements Closeable { if (!_managedChannel.isShutdown()) { try { _managedChannel.shutdownNow(); - if (!_managedChannel.awaitTermination(DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND, TimeUnit.SECONDS)) { + if (!_managedChannel.awaitTermination(_channelShutdownTimeoutSeconds, TimeUnit.SECONDS)) { LOGGER.warn("Timed out forcefully shutting down connection: {}. ", _managedChannel); } } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org