This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 21ff6bf354 TLS Configuration Support for QueryServer and Dispatch 
Client (#13645)
21ff6bf354 is described below

commit 21ff6bf3545e4fe15fcdee9cc3851a04f85cc08c
Author: Anand Kr Shaw <anandkrshawherit...@gmail.com>
AuthorDate: Wed Sep 18 07:25:30 2024 +0530

    TLS Configuration Support for QueryServer and Dispatch Client (#13645)
---
 .../pinot/common/utils/grpc/GrpcQueryClient.java   |  2 +-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  2 +-
 .../query/service/dispatch/DispatchClient.java     | 16 ++++++++++++-
 .../pinot/query/service/server/QueryServer.java    | 16 +++++++++++--
 .../service/dispatch/QueryDispatcherTest.java      |  2 +-
 .../query/service/server/QueryServerTest.java      |  2 +-
 .../pinot/server/starter/ServerInstance.java       | 27 +++++++++++-----------
 .../pinot/server/worker/WorkerQueryServer.java     | 20 ++++++++--------
 8 files changed, 56 insertions(+), 31 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index 9987876b95..808e8d0e02 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -84,7 +84,7 @@ public class GrpcQueryClient implements Closeable {
     _channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond();
   }
 
-  private SslContext buildSslContext(TlsConfig tlsConfig) {
+  public static SslContext buildSslContext(TlsConfig tlsConfig) {
     LOGGER.info("Building gRPC SSL context");
     SslContext sslContext = 
CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(), 
tlsConfigHashCode -> {
       try {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index daae6d74cc..4d8608c5ea 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -119,7 +119,7 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
         ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   }
 
-  private SslContext buildGRpcSslContext(TlsConfig tlsConfig)
+  public static SslContext buildGRpcSslContext(TlsConfig tlsConfig)
       throws IllegalArgumentException {
     LOGGER.info("Building gRPC SSL context");
     if (tlsConfig.getKeyStorePath() == null) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
index 5b036930ce..cea23218cd 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
@@ -21,10 +21,14 @@ package org.apache.pinot.query.service.dispatch;
 import io.grpc.Deadline;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
 import io.grpc.stub.StreamObserver;
+import java.util.Collections;
 import java.util.function.Consumer;
+import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.query.routing.QueryServerInstance;
 
 
@@ -41,7 +45,17 @@ class DispatchClient {
   private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
 
   public DispatchClient(String host, int port) {
-    _channel = ManagedChannelBuilder.forAddress(host, 
port).usePlaintext().build();
+    this(host, port, new GrpcConfig(Collections.emptyMap()));
+  }
+
+  public DispatchClient(String host, int port, GrpcConfig grpcConfig) {
+    if (grpcConfig.isUsePlainText()) {
+      _channel = ManagedChannelBuilder.forAddress(host, 
port).usePlaintext().build();
+    } else {
+      _channel =
+          NettyChannelBuilder.forAddress(host, port)
+              
.sslContext(GrpcQueryClient.buildSslContext(grpcConfig.getTlsConfig())).build();
+    }
     _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 763192e16e..65e4ca7df0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.service.server;
 
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
 import io.grpc.stub.StreamObserver;
 import java.util.List;
 import java.util.Map;
@@ -27,10 +28,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
 import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.StagePlan;
@@ -55,6 +58,7 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   private final int _port;
   private final QueryRunner _queryRunner;
+  private final TlsConfig _tlsConfig;
   // query submission service is only used for plan submission for now.
   // TODO: with complex query submission logic we should allow asynchronous 
query submission return instead of
   //   directly return from submission response observer.
@@ -62,9 +66,10 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   private Server _server = null;
 
-  public QueryServer(int port, QueryRunner queryRunner) {
+  public QueryServer(int port, QueryRunner queryRunner, TlsConfig tlsConfig) {
     _port = port;
     _queryRunner = queryRunner;
+    _tlsConfig = tlsConfig;
     _querySubmissionExecutorService =
         Executors.newCachedThreadPool(new 
NamedThreadFactory("query_submission_executor_on_" + _port + "_port"));
   }
@@ -73,7 +78,14 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     LOGGER.info("Starting QueryServer");
     try {
       if (_server == null) {
-        _server = 
ServerBuilder.forPort(_port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
+        if (_tlsConfig == null) {
+          _server = ServerBuilder.forPort(_port).addService(this)
+              .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
+        } else {
+          _server = NettyServerBuilder.forPort(_port).addService(this)
+              .sslContext(GrpcQueryServer.buildGRpcSslContext(_tlsConfig))
+              .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
+        }
         LOGGER.info("Initialized QueryServer on port: {}", _port);
       }
       _queryRunner.start();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 694fb3c087..c21c40b2d9 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -60,7 +60,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
-      QueryServer queryServer = Mockito.spy(new QueryServer(availablePort, 
queryRunner));
+      QueryServer queryServer = Mockito.spy(new QueryServer(availablePort, 
queryRunner, null));
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index 3a0b23408e..7a14a2a4c6 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -76,7 +76,7 @@ public class QueryServerTest extends QueryTestSet {
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = mock(QueryRunner.class);
-      QueryServer queryServer = new QueryServer(availablePort, queryRunner);
+      QueryServer queryServer = new QueryServer(availablePort, queryRunner, 
null);
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);
       _queryRunnerMap.put(availablePort, queryRunner);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 2a75ca7f5a..01f4402710 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -119,15 +119,15 @@ public class ServerInstance {
         TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), 
CommonConstants.Server.SERVER_TLS_PREFIX);
     NettyConfig nettyConfig =
         NettyConfig.extractNettyConfig(serverConf.getPinotConfig(), 
CommonConstants.Server.SERVER_NETTY_PREFIX);
-    accessControlFactory
-        
.init(serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
-            helixManager);
+    accessControlFactory.init(
+        
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
 helixManager);
     _accessControl = accessControlFactory.create();
 
     if (serverConf.isMultiStageServerEnabled()) {
       LOGGER.info("Initializing Multi-stage query engine");
-      _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager, helixManager,
-          _serverMetrics);
+      _workerQueryServer =
+          new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager, helixManager, _serverMetrics,
+              serverConf.isNettyTlsServerEnabled() ? tlsConfig : null);
     } else {
       _workerQueryServer = null;
     }
@@ -135,9 +135,9 @@ public class ServerInstance {
     if (serverConf.isNettyServerEnabled()) {
       int nettyPort = serverConf.getNettyPort();
       LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
-      _instanceRequestHandler = ChannelHandlerFactory
-          .getInstanceRequestHandler(helixManager.getInstanceName(), 
serverConf.getPinotConfig(), _queryScheduler,
-              _serverMetrics, new AllowAllAccessFactory().create());
+      _instanceRequestHandler =
+          
ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(), 
serverConf.getPinotConfig(),
+              _queryScheduler, _serverMetrics, new 
AllowAllAccessFactory().create());
       _nettyQueryServer = new QueryServer(nettyPort, nettyConfig, 
_instanceRequestHandler);
     } else {
       _nettyQueryServer = null;
@@ -146,9 +146,9 @@ public class ServerInstance {
     if (serverConf.isNettyTlsServerEnabled()) {
       int nettySecPort = serverConf.getNettyTlsPort();
       LOGGER.info("Initializing TLS-secured Netty query server on port: {}", 
nettySecPort);
-      _instanceRequestHandler = ChannelHandlerFactory
-          .getInstanceRequestHandler(helixManager.getInstanceName(), 
serverConf.getPinotConfig(), _queryScheduler,
-              _serverMetrics, _accessControl);
+      _instanceRequestHandler =
+          
ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(), 
serverConf.getPinotConfig(),
+              _queryScheduler, _serverMetrics, _accessControl);
       _nettyTlsQueryServer = new QueryServer(nettySecPort, nettyConfig, 
tlsConfig, _instanceRequestHandler);
     } else {
       _nettyTlsQueryServer = null;
@@ -157,9 +157,8 @@ public class ServerInstance {
       int grpcPort = serverConf.getGrpcPort();
       LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
       _grpcQueryServer = new GrpcQueryServer(grpcPort, 
GrpcConfig.buildGrpcQueryConfig(serverConf.getPinotConfig()),
-          serverConf.isGrpcTlsServerEnabled() ? TlsUtils
-              .extractTlsConfig(serverConf.getPinotConfig(), 
CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null,
-          _queryExecutor, _serverMetrics, _accessControl);
+          serverConf.isGrpcTlsServerEnabled() ? 
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
+              CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, 
_queryExecutor, _serverMetrics, _accessControl);
     } else {
       _grpcQueryServer = null;
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 45db3208ec..542cc9bd90 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.server.worker;
 
 import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.query.runtime.QueryRunner;
@@ -37,19 +38,20 @@ public class WorkerQueryServer {
   private QueryRunner _queryRunner;
   private InstanceDataManager _instanceDataManager;
   private ServerMetrics _serverMetrics;
+  private TlsConfig _tlsConfig;
 
   public WorkerQueryServer(PinotConfiguration configuration, 
InstanceDataManager instanceDataManager,
-      HelixManager helixManager, ServerMetrics serverMetrics) {
+      HelixManager helixManager, ServerMetrics serverMetrics, TlsConfig 
tlsConfig) {
     _configuration = toWorkerQueryConfig(configuration);
     _helixManager = helixManager;
     _instanceDataManager = instanceDataManager;
+    _tlsConfig = tlsConfig;
     _serverMetrics = serverMetrics;
-    _queryServicePort =
-        
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
-            CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
+    _queryServicePort = 
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
+        CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     _queryRunner = new QueryRunner();
     _queryRunner.init(_configuration, _instanceDataManager, _helixManager, 
_serverMetrics);
-    _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner);
+    _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner, 
_tlsConfig);
   }
 
   private static PinotConfiguration toWorkerQueryConfig(PinotConfiguration 
configuration) {
@@ -62,17 +64,15 @@ public class WorkerQueryServer {
           CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceId;
       
newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME,
 hostname);
     }
-    int runnerPort = newConfig.getProperty(
-        CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+    int runnerPort = 
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
         CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
     if (runnerPort == -1) {
       runnerPort =
           newConfig.getProperty(CommonConstants.Server.CONFIG_OF_GRPC_PORT, 
CommonConstants.Server.DEFAULT_GRPC_PORT);
       
newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
 runnerPort);
     }
-    int servicePort =
-        
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
-            CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
+    int servicePort = 
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
+        CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     if (servicePort == -1) {
       servicePort = 
newConfig.getProperty(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT,
           CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to