This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bfb604a883 fix grpc query server not setting max inbound msg size 
(#9126)
bfb604a883 is described below

commit bfb604a883a1b63f85fce35fb8fb559cd073a728
Author: Rong Rong <walterddr.walter...@gmail.com>
AuthorDate: Mon Aug 1 16:14:38 2022 -0700

    fix grpc query server not setting max inbound msg size (#9126)
    
    * fix grpc query server not setting max inbound msg size
    * fix compilation issue for non-default enable profiles
    * adjusting config definition to allow pinotconfig to be populated into 
grpc config
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../requesthandler/GrpcBrokerRequestHandler.java   | 14 ++--
 .../org/apache/pinot/common/config/GrpcConfig.java | 77 ++++++++++++++++++++++
 .../pinot/common/utils/grpc/GrpcQueryClient.java   | 61 ++---------------
 .../presto/grpc/PinotStreamingQueryClient.java     |  5 +-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  9 ++-
 .../OfflineSecureGRPCServerIntegrationTest.java    |  5 +-
 .../pinot/server/starter/ServerInstance.java       |  3 +-
 7 files changed, 101 insertions(+), 73 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 0b3744f018..9c30bacbe7 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
 public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcBrokerRequestHandler.class);
 
-  private final GrpcQueryClient.Config _grpcConfig;
+  private final GrpcConfig _grpcConfig;
   private final StreamingReduceService _streamingReduceService;
   private final PinotStreamingQueryClient _streamingQueryClient;
 
@@ -63,7 +64,7 @@ public class GrpcBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
     super(config, routingManager, accessControlFactory, queryQuotaManager, 
tableCache, brokerMetrics);
     LOGGER.info("Using Grpc BrokerRequestHandler.");
-    _grpcConfig = buildGrpcQueryClientConfig(config);
+    _grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);
 
     // create streaming query client
     _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig);
@@ -125,16 +126,11 @@ public class GrpcBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     }
   }
 
-  // return empty config for now
-  private GrpcQueryClient.Config buildGrpcQueryClientConfig(PinotConfiguration 
config) {
-    return new GrpcQueryClient.Config();
-  }
-
   public static class PinotStreamingQueryClient {
     private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new 
ConcurrentHashMap<>();
-    private final GrpcQueryClient.Config _config;
+    private final GrpcConfig _config;
 
-    public PinotStreamingQueryClient(GrpcQueryClient.Config config) {
+    public PinotStreamingQueryClient(GrpcConfig config) {
       _config = config;
     }
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java 
b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
new file mode 100644
index 0000000000..3a5c8cdf9e
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public class GrpcConfig {
+  public static final String GRPC_TLS_PREFIX = "tls";
+  public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
+  public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = 
"maxInboundMessageSizeBytes";
+  // Default max message size to 128MB
+  public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 
* 1024;
+  // Default use plain text for transport
+  private static final String DEFAULT_IS_USE_PLAIN_TEXT = "true";
+
+  private final TlsConfig _tlsConfig;
+  private final PinotConfiguration _pinotConfig;
+
+  public static GrpcConfig buildGrpcQueryConfig(PinotConfiguration 
pinotConfig) {
+    return new GrpcConfig(pinotConfig);
+  }
+
+  public GrpcConfig(PinotConfiguration pinotConfig) {
+    _pinotConfig = pinotConfig;
+    _tlsConfig = TlsUtils.extractTlsConfig(_pinotConfig, GRPC_TLS_PREFIX);
+  }
+
+  public GrpcConfig(Map<String, Object> configMap) {
+    this(new PinotConfiguration(configMap));
+  }
+
+  public GrpcConfig(int maxInboundMessageSizeBytes, boolean usePlainText) {
+    this(ImmutableMap.of(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, 
maxInboundMessageSizeBytes, CONFIG_USE_PLAIN_TEXT,
+        usePlainText));
+  }
+
+  // Allow get customized configs.
+  public Object get(String key) {
+    return _pinotConfig.getProperty(key);
+  }
+
+  public int getMaxInboundMessageSizeBytes() {
+    return _pinotConfig.getProperty(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, 
DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE);
+  }
+
+  public boolean isUsePlainText() {
+    return 
Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT, 
DEFAULT_IS_USE_PLAIN_TEXT));
+  }
+
+  public TlsConfig getTlsConfig() {
+    return _tlsConfig;
+  }
+
+  public PinotConfiguration getPinotConfig() {
+    return _pinotConfig;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index df074d2581..88611f427d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -18,24 +18,22 @@
  */
 package org.apache.pinot.common.utils.grpc;
 
-import com.google.common.collect.ImmutableMap;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.TrustManagerFactory;
-import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.TlsUtils;
-import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,10 +46,10 @@ public class GrpcQueryClient {
   private final PinotQueryServerGrpc.PinotQueryServerBlockingStub 
_blockingStub;
 
   public GrpcQueryClient(String host, int port) {
-    this(host, port, new Config());
+    this(host, port, new GrpcConfig(Collections.emptyMap()));
   }
 
-  public GrpcQueryClient(String host, int port, Config config) {
+  public GrpcQueryClient(String host, int port, GrpcConfig config) {
     if (config.isUsePlainText()) {
       _managedChannel =
           ManagedChannelBuilder.forAddress(host, 
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
@@ -99,55 +97,4 @@ public class GrpcQueryClient {
       }
     }
   }
-
-  public static class Config {
-    public static final String GRPC_TLS_PREFIX = "tls";
-    public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
-    public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = 
"maxInboundMessageSizeBytes";
-    // Default max message size to 128MB
-    public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 
1024 * 1024;
-
-    private final int _maxInboundMessageSizeBytes;
-    private final boolean _usePlainText;
-    private final TlsConfig _tlsConfig;
-    private final PinotConfiguration _pinotConfig;
-
-    public Config() {
-      this(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE, true);
-    }
-
-    public Config(int maxInboundMessageSizeBytes, boolean usePlainText) {
-      this(ImmutableMap.of(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, 
maxInboundMessageSizeBytes, CONFIG_USE_PLAIN_TEXT,
-          usePlainText));
-    }
-
-    public Config(Map<String, Object> configMap) {
-      _pinotConfig = new PinotConfiguration(configMap);
-      _maxInboundMessageSizeBytes =
-          _pinotConfig.getProperty(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, 
DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE);
-      _usePlainText = 
Boolean.valueOf(configMap.get(CONFIG_USE_PLAIN_TEXT).toString());
-      _tlsConfig = TlsUtils.extractTlsConfig(_pinotConfig, GRPC_TLS_PREFIX);
-    }
-
-    // Allow get customized configs.
-    public Object get(String key) {
-      return _pinotConfig.getProperty(key);
-    }
-
-    public int getMaxInboundMessageSizeBytes() {
-      return _maxInboundMessageSizeBytes;
-    }
-
-    public boolean isUsePlainText() {
-      return _usePlainText;
-    }
-
-    public TlsConfig getTlsConfig() {
-      return _tlsConfig;
-    }
-
-    public PinotConfiguration getPinotConfig() {
-      return _pinotConfig;
-    }
-  }
 }
diff --git 
a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
 
b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
index a5658bba5d..f6a9ccceb3 100644
--- 
a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
+++ 
b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
@@ -21,6 +21,7 @@ package org.apache.pinot.connector.presto.grpc;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
@@ -31,9 +32,9 @@ import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
  */
 public class PinotStreamingQueryClient {
   private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new 
HashMap<>();
-  private final GrpcQueryClient.Config _config;
+  private final GrpcConfig _config;
 
-  public PinotStreamingQueryClient(GrpcQueryClient.Config config) {
+  public PinotStreamingQueryClient(GrpcConfig config) {
     _config = config;
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 6115244641..bf8f2aa1c9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -31,6 +31,7 @@ import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -60,13 +61,15 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
       
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   private final AccessControl _accessControl;
 
-  public GrpcQueryServer(int port, TlsConfig tlsConfig, QueryExecutor 
queryExecutor, ServerMetrics serverMetrics,
-      AccessControl accessControl) {
+  public GrpcQueryServer(int port, GrpcConfig config, TlsConfig tlsConfig, 
QueryExecutor queryExecutor,
+      ServerMetrics serverMetrics, AccessControl accessControl) {
     _queryExecutor = queryExecutor;
     _serverMetrics = serverMetrics;
     if (tlsConfig != null) {
       try {
-        _server = 
NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig)).addService(this).build();
+        _server = 
NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig))
+            .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
+            .addService(this).build();
       } catch (Exception e) {
         throw new RuntimeException("Failed to start secure grpcQueryServer", 
e);
       }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
index 8df08e4fdf..48d6d3b3b7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.integration.tests;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
@@ -56,7 +57,9 @@ public class OfflineSecureGRPCServerIntegrationTest extends 
OfflineGRPCServerInt
     configMap.put("tls.truststore.password", PASSWORD);
     configMap.put("tls.truststore.type", JKS);
     configMap.put("tls.ssl.provider", JDK);
-    GrpcQueryClient.Config config = new GrpcQueryClient.Config(configMap);
+    PinotConfiguration brokerConfig = new PinotConfiguration(configMap);
+    // This mimics how pinot broker instantiates GRPCQueryClient.
+    GrpcConfig config = GrpcConfig.buildGrpcQueryConfig(brokerConfig);
     return new GrpcQueryClient("localhost", Server.DEFAULT_GRPC_PORT, config);
   }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index ec1a32b3bf..7ed0a57989 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.LongAccumulator;
 import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.config.NettyConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.function.FunctionRegistry;
@@ -132,7 +133,7 @@ public class ServerInstance {
     if (serverConf.isEnableGrpcServer()) {
       int grpcPort = serverConf.getGrpcPort();
       LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
-      _grpcQueryServer = new GrpcQueryServer(grpcPort,
+      _grpcQueryServer = new GrpcQueryServer(grpcPort, 
GrpcConfig.buildGrpcQueryConfig(serverConf.getPinotConfig()),
           serverConf.isGrpcTlsServerEnabled() ? 
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
               CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, 
_queryExecutor, _serverMetrics, _accessControl);
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to