This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new bfb604a883 fix grpc query server not setting max inbound msg size (#9126) bfb604a883 is described below commit bfb604a883a1b63f85fce35fb8fb559cd073a728 Author: Rong Rong <walterddr.walter...@gmail.com> AuthorDate: Mon Aug 1 16:14:38 2022 -0700 fix grpc query server not setting max inbound msg size (#9126) * fix grpc query server not setting max inbound msg size * fix compilation issue for non-default enable profiles * adjusting config definition to allow pinotconfig to be populated into grpc config Co-authored-by: Rong Rong <ro...@startree.ai> --- .../requesthandler/GrpcBrokerRequestHandler.java | 14 ++-- .../org/apache/pinot/common/config/GrpcConfig.java | 77 ++++++++++++++++++++++ .../pinot/common/utils/grpc/GrpcQueryClient.java | 61 ++--------------- .../presto/grpc/PinotStreamingQueryClient.java | 5 +- .../pinot/core/transport/grpc/GrpcQueryServer.java | 9 ++- .../OfflineSecureGRPCServerIntegrationTest.java | 5 +- .../pinot/server/starter/ServerInstance.java | 3 +- 7 files changed, 101 insertions(+), 73 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 0b3744f018..9c30bacbe7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -28,6 +28,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory; public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { private static final Logger LOGGER = LoggerFactory.getLogger(GrpcBrokerRequestHandler.class); - private final GrpcQueryClient.Config _grpcConfig; + private final GrpcConfig _grpcConfig; private final StreamingReduceService _streamingReduceService; private final PinotStreamingQueryClient _streamingQueryClient; @@ -63,7 +64,7 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); LOGGER.info("Using Grpc BrokerRequestHandler."); - _grpcConfig = buildGrpcQueryClientConfig(config); + _grpcConfig = GrpcConfig.buildGrpcQueryConfig(config); // create streaming query client _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig); @@ -125,16 +126,11 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { } } - // return empty config for now - private GrpcQueryClient.Config buildGrpcQueryClientConfig(PinotConfiguration config) { - return new GrpcQueryClient.Config(); - } - public static class PinotStreamingQueryClient { private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new ConcurrentHashMap<>(); - private final GrpcQueryClient.Config _config; + private final GrpcConfig _config; - public PinotStreamingQueryClient(GrpcQueryClient.Config config) { + public PinotStreamingQueryClient(GrpcConfig config) { _config = config; } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java new file mode 100644 index 0000000000..3a5c8cdf9e --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.config; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.pinot.common.utils.TlsUtils; +import org.apache.pinot.spi.env.PinotConfiguration; + + +public class GrpcConfig { + public static final String GRPC_TLS_PREFIX = "tls"; + public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText"; + public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = "maxInboundMessageSizeBytes"; + // Default max message size to 128MB + public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 * 1024; + // Default use plain text for transport + private static final String DEFAULT_IS_USE_PLAIN_TEXT = "true"; + + private final TlsConfig _tlsConfig; + private final PinotConfiguration _pinotConfig; + + public static GrpcConfig buildGrpcQueryConfig(PinotConfiguration pinotConfig) { + return new GrpcConfig(pinotConfig); + } + + public GrpcConfig(PinotConfiguration pinotConfig) { + _pinotConfig = pinotConfig; + _tlsConfig = TlsUtils.extractTlsConfig(_pinotConfig, GRPC_TLS_PREFIX); + } + + public GrpcConfig(Map<String, Object> configMap) { + this(new PinotConfiguration(configMap)); + } + + public GrpcConfig(int maxInboundMessageSizeBytes, boolean usePlainText) { + this(ImmutableMap.of(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, maxInboundMessageSizeBytes, CONFIG_USE_PLAIN_TEXT, + usePlainText)); + } + + // Allow get customized configs. + public Object get(String key) { + return _pinotConfig.getProperty(key); + } + + public int getMaxInboundMessageSizeBytes() { + return _pinotConfig.getProperty(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE); + } + + public boolean isUsePlainText() { + return Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT, DEFAULT_IS_USE_PLAIN_TEXT)); + } + + public TlsConfig getTlsConfig() { + return _tlsConfig; + } + + public PinotConfiguration getPinotConfig() { + return _pinotConfig; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java index df074d2581..88611f427d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java @@ -18,24 +18,22 @@ */ package org.apache.pinot.common.utils.grpc; -import com.google.common.collect.ImmutableMap; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider; +import java.util.Collections; import java.util.Iterator; -import java.util.Map; import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLException; import javax.net.ssl.TrustManagerFactory; -import org.apache.pinot.common.config.TlsConfig; +import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.proto.PinotQueryServerGrpc; import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.utils.TlsUtils; -import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,10 +46,10 @@ public class GrpcQueryClient { private final PinotQueryServerGrpc.PinotQueryServerBlockingStub _blockingStub; public GrpcQueryClient(String host, int port) { - this(host, port, new Config()); + this(host, port, new GrpcConfig(Collections.emptyMap())); } - public GrpcQueryClient(String host, int port, Config config) { + public GrpcQueryClient(String host, int port, GrpcConfig config) { if (config.isUsePlainText()) { _managedChannel = ManagedChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) @@ -99,55 +97,4 @@ public class GrpcQueryClient { } } } - - public static class Config { - public static final String GRPC_TLS_PREFIX = "tls"; - public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText"; - public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = "maxInboundMessageSizeBytes"; - // Default max message size to 128MB - public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 * 1024; - - private final int _maxInboundMessageSizeBytes; - private final boolean _usePlainText; - private final TlsConfig _tlsConfig; - private final PinotConfiguration _pinotConfig; - - public Config() { - this(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE, true); - } - - public Config(int maxInboundMessageSizeBytes, boolean usePlainText) { - this(ImmutableMap.of(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, maxInboundMessageSizeBytes, CONFIG_USE_PLAIN_TEXT, - usePlainText)); - } - - public Config(Map<String, Object> configMap) { - _pinotConfig = new PinotConfiguration(configMap); - _maxInboundMessageSizeBytes = - _pinotConfig.getProperty(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE); - _usePlainText = Boolean.valueOf(configMap.get(CONFIG_USE_PLAIN_TEXT).toString()); - _tlsConfig = TlsUtils.extractTlsConfig(_pinotConfig, GRPC_TLS_PREFIX); - } - - // Allow get customized configs. - public Object get(String key) { - return _pinotConfig.getProperty(key); - } - - public int getMaxInboundMessageSizeBytes() { - return _maxInboundMessageSizeBytes; - } - - public boolean isUsePlainText() { - return _usePlainText; - } - - public TlsConfig getTlsConfig() { - return _tlsConfig; - } - - public PinotConfiguration getPinotConfig() { - return _pinotConfig; - } - } } diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java index a5658bba5d..f6a9ccceb3 100644 --- a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java +++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java @@ -21,6 +21,7 @@ package org.apache.pinot.connector.presto.grpc; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.utils.grpc.GrpcQueryClient; import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; @@ -31,9 +32,9 @@ import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; */ public class PinotStreamingQueryClient { private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new HashMap<>(); - private final GrpcQueryClient.Config _config; + private final GrpcConfig _config; - public PinotStreamingQueryClient(GrpcQueryClient.Config config) { + public PinotStreamingQueryClient(GrpcConfig config) { _config = config; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java index 6115244641..bf8f2aa1c9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java @@ -31,6 +31,7 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; @@ -60,13 +61,15 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS); private final AccessControl _accessControl; - public GrpcQueryServer(int port, TlsConfig tlsConfig, QueryExecutor queryExecutor, ServerMetrics serverMetrics, - AccessControl accessControl) { + public GrpcQueryServer(int port, GrpcConfig config, TlsConfig tlsConfig, QueryExecutor queryExecutor, + ServerMetrics serverMetrics, AccessControl accessControl) { _queryExecutor = queryExecutor; _serverMetrics = serverMetrics; if (tlsConfig != null) { try { - _server = NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig)).addService(this).build(); + _server = NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig)) + .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) + .addService(this).build(); } catch (Exception e) { throw new RuntimeException("Failed to start secure grpcQueryServer", e); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java index 8df08e4fdf..48d6d3b3b7 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java @@ -21,6 +21,7 @@ package org.apache.pinot.integration.tests; import java.net.URL; import java.util.HashMap; import java.util.Map; +import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.utils.grpc.GrpcQueryClient; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants.Server; @@ -56,7 +57,9 @@ public class OfflineSecureGRPCServerIntegrationTest extends OfflineGRPCServerInt configMap.put("tls.truststore.password", PASSWORD); configMap.put("tls.truststore.type", JKS); configMap.put("tls.ssl.provider", JDK); - GrpcQueryClient.Config config = new GrpcQueryClient.Config(configMap); + PinotConfiguration brokerConfig = new PinotConfiguration(configMap); + // This mimics how pinot broker instantiates GRPCQueryClient. + GrpcConfig config = GrpcConfig.buildGrpcQueryConfig(brokerConfig); return new GrpcQueryClient("localhost", Server.DEFAULT_GRPC_PORT, config); } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index ec1a32b3bf..7ed0a57989 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.LongAccumulator; import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.config.NettyConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.function.FunctionRegistry; @@ -132,7 +133,7 @@ public class ServerInstance { if (serverConf.isEnableGrpcServer()) { int grpcPort = serverConf.getGrpcPort(); LOGGER.info("Initializing gRPC query server on port: {}", grpcPort); - _grpcQueryServer = new GrpcQueryServer(grpcPort, + _grpcQueryServer = new GrpcQueryServer(grpcPort, GrpcConfig.buildGrpcQueryConfig(serverConf.getPinotConfig()), serverConf.isGrpcTlsServerEnabled() ? TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, _queryExecutor, _serverMetrics, _accessControl); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org