This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 736733d553 Reverting https://github.com/apache/pinot/pull/13645 PR 
(#14259)
736733d553 is described below

commit 736733d5536307de4d27cc547a87b8cc9ab1ee85
Author: soumitra-st <127247229+soumitra...@users.noreply.github.com>
AuthorDate: Tue Oct 22 10:56:16 2024 -0700

    Reverting https://github.com/apache/pinot/pull/13645 PR (#14259)
---
 .../pinot/common/utils/grpc/GrpcQueryClient.java   |  2 +-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  2 +-
 .../query/service/dispatch/DispatchClient.java     | 16 +------------
 .../pinot/query/service/server/QueryServer.java    | 16 ++-----------
 .../service/dispatch/QueryDispatcherTest.java      |  2 +-
 .../query/service/server/QueryServerTest.java      |  2 +-
 .../pinot/server/starter/ServerInstance.java       | 27 +++++++++++-----------
 .../pinot/server/worker/WorkerQueryServer.java     | 20 ++++++++--------
 8 files changed, 31 insertions(+), 56 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index 808e8d0e02..9987876b95 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -84,7 +84,7 @@ public class GrpcQueryClient implements Closeable {
     _channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond();
   }
 
-  public static SslContext buildSslContext(TlsConfig tlsConfig) {
+  private SslContext buildSslContext(TlsConfig tlsConfig) {
     LOGGER.info("Building gRPC SSL context");
     SslContext sslContext = 
CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(), 
tlsConfigHashCode -> {
       try {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 4d8608c5ea..daae6d74cc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -119,7 +119,7 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
         ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   }
 
-  public static SslContext buildGRpcSslContext(TlsConfig tlsConfig)
+  private SslContext buildGRpcSslContext(TlsConfig tlsConfig)
       throws IllegalArgumentException {
     LOGGER.info("Building gRPC SSL context");
     if (tlsConfig.getKeyStorePath() == null) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
index 4665b23a48..cbb8be1e6c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
@@ -21,15 +21,11 @@ package org.apache.pinot.query.service.dispatch;
 import io.grpc.Deadline;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
 import io.grpc.stub.StreamObserver;
-import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
-import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.query.routing.QueryServerInstance;
 
 
@@ -46,17 +42,7 @@ class DispatchClient {
   private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
 
   public DispatchClient(String host, int port) {
-    this(host, port, new GrpcConfig(Collections.emptyMap()));
-  }
-
-  public DispatchClient(String host, int port, GrpcConfig grpcConfig) {
-    if (grpcConfig.isUsePlainText()) {
-      _channel = ManagedChannelBuilder.forAddress(host, 
port).usePlaintext().build();
-    } else {
-      _channel =
-          NettyChannelBuilder.forAddress(host, port)
-              
.sslContext(GrpcQueryClient.buildSslContext(grpcConfig.getTlsConfig())).build();
-    }
+    _channel = ManagedChannelBuilder.forAddress(host, 
port).usePlaintext().build();
     _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 36104011f3..d8a1ecfef8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.service.server;
 import com.google.protobuf.ByteString;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
-import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
 import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
@@ -34,12 +33,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.utils.NamedThreadFactory;
-import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
 import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
 import org.apache.pinot.query.routing.StageMetadata;
@@ -65,7 +62,6 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   private final int _port;
   private final QueryRunner _queryRunner;
-  private final TlsConfig _tlsConfig;
   // query submission service is only used for plan submission for now.
   // TODO: with complex query submission logic we should allow asynchronous 
query submission return instead of
   //   directly return from submission response observer.
@@ -73,10 +69,9 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   private Server _server = null;
 
-  public QueryServer(int port, QueryRunner queryRunner, TlsConfig tlsConfig) {
+  public QueryServer(int port, QueryRunner queryRunner) {
     _port = port;
     _queryRunner = queryRunner;
-    _tlsConfig = tlsConfig;
     _querySubmissionExecutorService =
         Executors.newCachedThreadPool(new 
NamedThreadFactory("query_submission_executor_on_" + _port + "_port"));
   }
@@ -85,14 +80,7 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     LOGGER.info("Starting QueryServer");
     try {
       if (_server == null) {
-        if (_tlsConfig == null) {
-          _server = ServerBuilder.forPort(_port).addService(this)
-              .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
-        } else {
-          _server = NettyServerBuilder.forPort(_port).addService(this)
-              .sslContext(GrpcQueryServer.buildGRpcSslContext(_tlsConfig))
-              .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
-        }
+        _server = 
ServerBuilder.forPort(_port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
         LOGGER.info("Initialized QueryServer on port: {}", _port);
       }
       _queryRunner.start();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index c21c40b2d9..694fb3c087 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -60,7 +60,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
-      QueryServer queryServer = Mockito.spy(new QueryServer(availablePort, 
queryRunner, null));
+      QueryServer queryServer = Mockito.spy(new QueryServer(availablePort, 
queryRunner));
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index 7a14a2a4c6..3a0b23408e 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -76,7 +76,7 @@ public class QueryServerTest extends QueryTestSet {
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = mock(QueryRunner.class);
-      QueryServer queryServer = new QueryServer(availablePort, queryRunner, 
null);
+      QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);
       _queryRunnerMap.put(availablePort, queryRunner);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 01f4402710..2a75ca7f5a 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -119,15 +119,15 @@ public class ServerInstance {
         TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), 
CommonConstants.Server.SERVER_TLS_PREFIX);
     NettyConfig nettyConfig =
         NettyConfig.extractNettyConfig(serverConf.getPinotConfig(), 
CommonConstants.Server.SERVER_NETTY_PREFIX);
-    accessControlFactory.init(
-        
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
 helixManager);
+    accessControlFactory
+        
.init(serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
+            helixManager);
     _accessControl = accessControlFactory.create();
 
     if (serverConf.isMultiStageServerEnabled()) {
       LOGGER.info("Initializing Multi-stage query engine");
-      _workerQueryServer =
-          new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager, helixManager, _serverMetrics,
-              serverConf.isNettyTlsServerEnabled() ? tlsConfig : null);
+      _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager, helixManager,
+          _serverMetrics);
     } else {
       _workerQueryServer = null;
     }
@@ -135,9 +135,9 @@ public class ServerInstance {
     if (serverConf.isNettyServerEnabled()) {
       int nettyPort = serverConf.getNettyPort();
       LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
-      _instanceRequestHandler =
-          
ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(), 
serverConf.getPinotConfig(),
-              _queryScheduler, _serverMetrics, new 
AllowAllAccessFactory().create());
+      _instanceRequestHandler = ChannelHandlerFactory
+          .getInstanceRequestHandler(helixManager.getInstanceName(), 
serverConf.getPinotConfig(), _queryScheduler,
+              _serverMetrics, new AllowAllAccessFactory().create());
       _nettyQueryServer = new QueryServer(nettyPort, nettyConfig, 
_instanceRequestHandler);
     } else {
       _nettyQueryServer = null;
@@ -146,9 +146,9 @@ public class ServerInstance {
     if (serverConf.isNettyTlsServerEnabled()) {
       int nettySecPort = serverConf.getNettyTlsPort();
       LOGGER.info("Initializing TLS-secured Netty query server on port: {}", 
nettySecPort);
-      _instanceRequestHandler =
-          
ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(), 
serverConf.getPinotConfig(),
-              _queryScheduler, _serverMetrics, _accessControl);
+      _instanceRequestHandler = ChannelHandlerFactory
+          .getInstanceRequestHandler(helixManager.getInstanceName(), 
serverConf.getPinotConfig(), _queryScheduler,
+              _serverMetrics, _accessControl);
       _nettyTlsQueryServer = new QueryServer(nettySecPort, nettyConfig, 
tlsConfig, _instanceRequestHandler);
     } else {
       _nettyTlsQueryServer = null;
@@ -157,8 +157,9 @@ public class ServerInstance {
       int grpcPort = serverConf.getGrpcPort();
       LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
       _grpcQueryServer = new GrpcQueryServer(grpcPort, 
GrpcConfig.buildGrpcQueryConfig(serverConf.getPinotConfig()),
-          serverConf.isGrpcTlsServerEnabled() ? 
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
-              CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, 
_queryExecutor, _serverMetrics, _accessControl);
+          serverConf.isGrpcTlsServerEnabled() ? TlsUtils
+              .extractTlsConfig(serverConf.getPinotConfig(), 
CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null,
+          _queryExecutor, _serverMetrics, _accessControl);
     } else {
       _grpcQueryServer = null;
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 542cc9bd90..45db3208ec 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.server.worker;
 
 import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.query.runtime.QueryRunner;
@@ -38,20 +37,19 @@ public class WorkerQueryServer {
   private QueryRunner _queryRunner;
   private InstanceDataManager _instanceDataManager;
   private ServerMetrics _serverMetrics;
-  private TlsConfig _tlsConfig;
 
   public WorkerQueryServer(PinotConfiguration configuration, 
InstanceDataManager instanceDataManager,
-      HelixManager helixManager, ServerMetrics serverMetrics, TlsConfig 
tlsConfig) {
+      HelixManager helixManager, ServerMetrics serverMetrics) {
     _configuration = toWorkerQueryConfig(configuration);
     _helixManager = helixManager;
     _instanceDataManager = instanceDataManager;
-    _tlsConfig = tlsConfig;
     _serverMetrics = serverMetrics;
-    _queryServicePort = 
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
-        CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
+    _queryServicePort =
+        
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
+            CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     _queryRunner = new QueryRunner();
     _queryRunner.init(_configuration, _instanceDataManager, _helixManager, 
_serverMetrics);
-    _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner, 
_tlsConfig);
+    _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner);
   }
 
   private static PinotConfiguration toWorkerQueryConfig(PinotConfiguration 
configuration) {
@@ -64,15 +62,17 @@ public class WorkerQueryServer {
           CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceId;
       
newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME,
 hostname);
     }
-    int runnerPort = 
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+    int runnerPort = newConfig.getProperty(
+        CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
         CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
     if (runnerPort == -1) {
       runnerPort =
           newConfig.getProperty(CommonConstants.Server.CONFIG_OF_GRPC_PORT, 
CommonConstants.Server.DEFAULT_GRPC_PORT);
       
newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
 runnerPort);
     }
-    int servicePort = 
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
-        CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
+    int servicePort =
+        
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
+            CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     if (servicePort == -1) {
       servicePort = 
newConfig.getProperty(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT,
           CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to