This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 736733d553 Reverting https://github.com/apache/pinot/pull/13645 PR (#14259) 736733d553 is described below commit 736733d5536307de4d27cc547a87b8cc9ab1ee85 Author: soumitra-st <127247229+soumitra...@users.noreply.github.com> AuthorDate: Tue Oct 22 10:56:16 2024 -0700 Reverting https://github.com/apache/pinot/pull/13645 PR (#14259) --- .../pinot/common/utils/grpc/GrpcQueryClient.java | 2 +- .../pinot/core/transport/grpc/GrpcQueryServer.java | 2 +- .../query/service/dispatch/DispatchClient.java | 16 +------------ .../pinot/query/service/server/QueryServer.java | 16 ++----------- .../service/dispatch/QueryDispatcherTest.java | 2 +- .../query/service/server/QueryServerTest.java | 2 +- .../pinot/server/starter/ServerInstance.java | 27 +++++++++++----------- .../pinot/server/worker/WorkerQueryServer.java | 20 ++++++++-------- 8 files changed, 31 insertions(+), 56 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java index 808e8d0e02..9987876b95 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java @@ -84,7 +84,7 @@ public class GrpcQueryClient implements Closeable { _channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond(); } - public static SslContext buildSslContext(TlsConfig tlsConfig) { + private SslContext buildSslContext(TlsConfig tlsConfig) { LOGGER.info("Building gRPC SSL context"); SslContext sslContext = CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(), tlsConfigHashCode -> { try { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java index 4d8608c5ea..daae6d74cc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java @@ -119,7 +119,7 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa ResourceManager.DEFAULT_QUERY_WORKER_THREADS); } - public static SslContext buildGRpcSslContext(TlsConfig tlsConfig) + private SslContext buildGRpcSslContext(TlsConfig tlsConfig) throws IllegalArgumentException { LOGGER.info("Building gRPC SSL context"); if (tlsConfig.getKeyStorePath() == null) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java index 4665b23a48..cbb8be1e6c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java @@ -21,15 +21,11 @@ package org.apache.pinot.query.service.dispatch; import io.grpc.Deadline; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.StreamObserver; -import java.util.Collections; import java.util.List; import java.util.function.Consumer; -import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; -import org.apache.pinot.common.utils.grpc.GrpcQueryClient; import org.apache.pinot.query.routing.QueryServerInstance; @@ -46,17 +42,7 @@ class DispatchClient { private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub; public DispatchClient(String host, int port) { - this(host, port, new GrpcConfig(Collections.emptyMap())); - } - - public DispatchClient(String host, int port, GrpcConfig grpcConfig) { - if (grpcConfig.isUsePlainText()) { - _channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); - } else { - _channel = - NettyChannelBuilder.forAddress(host, port) - .sslContext(GrpcQueryClient.buildSslContext(grpcConfig.getTlsConfig())).build(); - } + _channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java index 36104011f3..d8a1ecfef8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java @@ -21,7 +21,6 @@ package org.apache.pinot.query.service.server; import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.ServerBuilder; -import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; @@ -34,12 +33,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Consumer; -import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.utils.NamedThreadFactory; -import org.apache.pinot.core.transport.grpc.GrpcQueryServer; import org.apache.pinot.query.planner.serde.PlanNodeSerializer; import org.apache.pinot.query.routing.QueryPlanSerDeUtils; import org.apache.pinot.query.routing.StageMetadata; @@ -65,7 +62,6 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { private final int _port; private final QueryRunner _queryRunner; - private final TlsConfig _tlsConfig; // query submission service is only used for plan submission for now. // TODO: with complex query submission logic we should allow asynchronous query submission return instead of // directly return from submission response observer. @@ -73,10 +69,9 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { private Server _server = null; - public QueryServer(int port, QueryRunner queryRunner, TlsConfig tlsConfig) { + public QueryServer(int port, QueryRunner queryRunner) { _port = port; _queryRunner = queryRunner; - _tlsConfig = tlsConfig; _querySubmissionExecutorService = Executors.newCachedThreadPool(new NamedThreadFactory("query_submission_executor_on_" + _port + "_port")); } @@ -85,14 +80,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { LOGGER.info("Starting QueryServer"); try { if (_server == null) { - if (_tlsConfig == null) { - _server = ServerBuilder.forPort(_port).addService(this) - .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build(); - } else { - _server = NettyServerBuilder.forPort(_port).addService(this) - .sslContext(GrpcQueryServer.buildGRpcSslContext(_tlsConfig)) - .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build(); - } + _server = ServerBuilder.forPort(_port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build(); LOGGER.info("Initialized QueryServer on port: {}", _port); } _queryRunner.start(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java index c21c40b2d9..694fb3c087 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java @@ -60,7 +60,7 @@ public class QueryDispatcherTest extends QueryTestSet { for (int i = 0; i < QUERY_SERVER_COUNT; i++) { int availablePort = QueryTestUtils.getAvailablePort(); QueryRunner queryRunner = Mockito.mock(QueryRunner.class); - QueryServer queryServer = Mockito.spy(new QueryServer(availablePort, queryRunner, null)); + QueryServer queryServer = Mockito.spy(new QueryServer(availablePort, queryRunner)); queryServer.start(); _queryServerMap.put(availablePort, queryServer); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java index 7a14a2a4c6..3a0b23408e 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java @@ -76,7 +76,7 @@ public class QueryServerTest extends QueryTestSet { for (int i = 0; i < QUERY_SERVER_COUNT; i++) { int availablePort = QueryTestUtils.getAvailablePort(); QueryRunner queryRunner = mock(QueryRunner.class); - QueryServer queryServer = new QueryServer(availablePort, queryRunner, null); + QueryServer queryServer = new QueryServer(availablePort, queryRunner); queryServer.start(); _queryServerMap.put(availablePort, queryServer); _queryRunnerMap.put(availablePort, queryRunner); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index 01f4402710..2a75ca7f5a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -119,15 +119,15 @@ public class ServerInstance { TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), CommonConstants.Server.SERVER_TLS_PREFIX); NettyConfig nettyConfig = NettyConfig.extractNettyConfig(serverConf.getPinotConfig(), CommonConstants.Server.SERVER_NETTY_PREFIX); - accessControlFactory.init( - serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL), helixManager); + accessControlFactory + .init(serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL), + helixManager); _accessControl = accessControlFactory.create(); if (serverConf.isMultiStageServerEnabled()) { LOGGER.info("Initializing Multi-stage query engine"); - _workerQueryServer = - new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, helixManager, _serverMetrics, - serverConf.isNettyTlsServerEnabled() ? tlsConfig : null); + _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, helixManager, + _serverMetrics); } else { _workerQueryServer = null; } @@ -135,9 +135,9 @@ public class ServerInstance { if (serverConf.isNettyServerEnabled()) { int nettyPort = serverConf.getNettyPort(); LOGGER.info("Initializing Netty query server on port: {}", nettyPort); - _instanceRequestHandler = - ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(), serverConf.getPinotConfig(), - _queryScheduler, _serverMetrics, new AllowAllAccessFactory().create()); + _instanceRequestHandler = ChannelHandlerFactory + .getInstanceRequestHandler(helixManager.getInstanceName(), serverConf.getPinotConfig(), _queryScheduler, + _serverMetrics, new AllowAllAccessFactory().create()); _nettyQueryServer = new QueryServer(nettyPort, nettyConfig, _instanceRequestHandler); } else { _nettyQueryServer = null; @@ -146,9 +146,9 @@ public class ServerInstance { if (serverConf.isNettyTlsServerEnabled()) { int nettySecPort = serverConf.getNettyTlsPort(); LOGGER.info("Initializing TLS-secured Netty query server on port: {}", nettySecPort); - _instanceRequestHandler = - ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(), serverConf.getPinotConfig(), - _queryScheduler, _serverMetrics, _accessControl); + _instanceRequestHandler = ChannelHandlerFactory + .getInstanceRequestHandler(helixManager.getInstanceName(), serverConf.getPinotConfig(), _queryScheduler, + _serverMetrics, _accessControl); _nettyTlsQueryServer = new QueryServer(nettySecPort, nettyConfig, tlsConfig, _instanceRequestHandler); } else { _nettyTlsQueryServer = null; @@ -157,8 +157,9 @@ public class ServerInstance { int grpcPort = serverConf.getGrpcPort(); LOGGER.info("Initializing gRPC query server on port: {}", grpcPort); _grpcQueryServer = new GrpcQueryServer(grpcPort, GrpcConfig.buildGrpcQueryConfig(serverConf.getPinotConfig()), - serverConf.isGrpcTlsServerEnabled() ? TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), - CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, _queryExecutor, _serverMetrics, _accessControl); + serverConf.isGrpcTlsServerEnabled() ? TlsUtils + .extractTlsConfig(serverConf.getPinotConfig(), CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, + _queryExecutor, _serverMetrics, _accessControl); } else { _grpcQueryServer = null; } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java index 542cc9bd90..45db3208ec 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java @@ -19,7 +19,6 @@ package org.apache.pinot.server.worker; import org.apache.helix.HelixManager; -import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.query.runtime.QueryRunner; @@ -38,20 +37,19 @@ public class WorkerQueryServer { private QueryRunner _queryRunner; private InstanceDataManager _instanceDataManager; private ServerMetrics _serverMetrics; - private TlsConfig _tlsConfig; public WorkerQueryServer(PinotConfiguration configuration, InstanceDataManager instanceDataManager, - HelixManager helixManager, ServerMetrics serverMetrics, TlsConfig tlsConfig) { + HelixManager helixManager, ServerMetrics serverMetrics) { _configuration = toWorkerQueryConfig(configuration); _helixManager = helixManager; _instanceDataManager = instanceDataManager; - _tlsConfig = tlsConfig; _serverMetrics = serverMetrics; - _queryServicePort = _configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT, - CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT); + _queryServicePort = + _configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT, + CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT); _queryRunner = new QueryRunner(); _queryRunner.init(_configuration, _instanceDataManager, _helixManager, _serverMetrics); - _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner, _tlsConfig); + _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner); } private static PinotConfiguration toWorkerQueryConfig(PinotConfiguration configuration) { @@ -64,15 +62,17 @@ public class WorkerQueryServer { CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceId; newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, hostname); } - int runnerPort = newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, + int runnerPort = newConfig.getProperty( + CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT); if (runnerPort == -1) { runnerPort = newConfig.getProperty(CommonConstants.Server.CONFIG_OF_GRPC_PORT, CommonConstants.Server.DEFAULT_GRPC_PORT); newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, runnerPort); } - int servicePort = newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT, - CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT); + int servicePort = + newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT, + CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT); if (servicePort == -1) { servicePort = newConfig.getProperty(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org