This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d0426e9864 Put Netty Channel handlers and Tls context creation logic 
at same place (#8163)
d0426e9864 is described below

commit d0426e9864a1ab6b7b8407603f19b7a30503e46a
Author: Liang Mingqiang <secret.mqli...@gmail.com>
AuthorDate: Mon Aug 1 17:45:42 2022 -0700

    Put Netty Channel handlers and Tls context creation logic at same place 
(#8163)
---
 .../org/apache/pinot/common/utils/TlsUtils.java    | 49 ++++++++++++
 .../core/transport/ChannelHandlerFactory.java      | 89 ++++++++++++++++++++++
 .../apache/pinot/core/transport/QueryServer.java   | 48 +++---------
 .../pinot/core/transport/ServerChannels.java       | 40 +++-------
 4 files changed, 156 insertions(+), 70 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java
index 404d03a127..0c444c2d68 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java
@@ -19,6 +19,10 @@
 package org.apache.pinot.common.utils;
 
 import com.google.common.base.Preconditions;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
@@ -339,4 +343,49 @@ public final class TlsUtils {
       return _sslSocketFactory.createSocket(host, port);
     }
   }
+
+  /**
+   * Builds client side SslContext based on a given TlsConfig.
+   *
+   * @param tlsConfig TLS config
+   */
+  public static SslContext buildClientContext(TlsConfig tlsConfig) {
+    SslContextBuilder sslContextBuilder =
+        
SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(tlsConfig.getSslProvider()));
+    if (tlsConfig.getKeyStorePath() != null) {
+      sslContextBuilder.keyManager(createKeyManagerFactory(tlsConfig));
+    }
+    if (tlsConfig.getTrustStorePath() != null) {
+      sslContextBuilder.trustManager(createTrustManagerFactory(tlsConfig));
+    }
+    try {
+      return sslContextBuilder.build();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Builds server side SslContext based on a given TlsConfig.
+   *
+   * @param tlsConfig TLS config
+   */
+  public static SslContext buildServerContext(TlsConfig tlsConfig) {
+    if (tlsConfig.getKeyStorePath() == null) {
+      throw new IllegalArgumentException("Must provide key store path for 
secured server");
+    }
+    SslContextBuilder sslContextBuilder = 
SslContextBuilder.forServer(createKeyManagerFactory(tlsConfig))
+        .sslProvider(SslProvider.valueOf(tlsConfig.getSslProvider()));
+    if (tlsConfig.getTrustStorePath() != null) {
+      sslContextBuilder.trustManager(createTrustManagerFactory(tlsConfig));
+    }
+    if (tlsConfig.isClientAuthEnabled()) {
+      sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+    }
+    try {
+      return sslContextBuilder.build();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
new file mode 100644
index 0000000000..c62c266af5
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.pinot.server.access.AccessControl;
+
+
+/**
+ * The {@code ChannelHandlerFactory} provides all kinds of Netty 
ChannelHandlers
+ */
+public class ChannelHandlerFactory {
+
+  public static final String SSL = "ssl";
+
+  private ChannelHandlerFactory() {
+  }
+
+  /**
+   * The {@code getLengthFieldBasedFrameDecoder} return a decoder 
ChannelHandler that splits the received ByteBuffers
+   * dynamically by the value of the length field in the message
+   */
+  public static ChannelHandler getLengthFieldBasedFrameDecoder() {
+    return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 
Integer.BYTES, 0, Integer.BYTES);
+  }
+
+  /**
+   * The {@code getLengthFieldPrepender} return an encoder ChannelHandler that 
prepends the length of the message.
+   */
+  public static ChannelHandler getLengthFieldPrepender() {
+    return new LengthFieldPrepender(Integer.BYTES);
+  }
+
+  /**
+   * The {@code getClientTlsHandler} return a Client side Tls handler that 
encrypt and decrypt everything.
+   */
+  public static ChannelHandler getClientTlsHandler(TlsConfig tlsConfig, 
SocketChannel ch) {
+    return TlsUtils.buildClientContext(tlsConfig).newHandler(ch.alloc());
+  }
+
+  /**
+   * The {@code getServerTlsHandler} return a Server side Tls handler that 
encrypt and decrypt everything.
+   */
+  public static ChannelHandler getServerTlsHandler(TlsConfig tlsConfig, 
SocketChannel ch) {
+    return TlsUtils.buildServerContext(tlsConfig).newHandler(ch.alloc());
+  }
+
+  /**
+   * The {@code getDataTableHandler} return a {@code DataTableHandler} Netty 
inbound handler on Pinot Broker side to
+   * handle the serialized data table responses sent from Pinot Server.
+   */
+  public static ChannelHandler getDataTableHandler(QueryRouter queryRouter, 
ServerRoutingInstance serverRoutingInstance,
+      BrokerMetrics brokerMetrics) {
+    return new DataTableHandler(queryRouter, serverRoutingInstance, 
brokerMetrics);
+  }
+
+  /**
+   * The {@code getInstanceRequestHandler} return a {@code 
InstanceRequestHandler} Netty inbound handler on Pinot
+   * Server side to handle the serialized instance requests sent from Pinot 
Broker.
+   */
+  public static ChannelHandler getInstanceRequestHandler(QueryScheduler 
queryScheduler, ServerMetrics serverMetrics,
+      AccessControl accessControl) {
+    return new InstanceRequestHandler(queryScheduler, serverMetrics, 
accessControl);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index d0acbc6000..beb64e58f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -33,16 +33,10 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.handler.ssl.ClientAuth;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.config.NettyConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.TlsUtils;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.util.OsCheck;
 import org.apache.pinot.server.access.AccessControl;
@@ -68,7 +62,6 @@ public class QueryServer {
   private final Class<? extends ServerSocketChannel> _channelClass;
   private Channel _channel;
 
-
   /**
    * Create an unsecured server instance
    *
@@ -89,11 +82,10 @@ public class QueryServer {
    * @param serverMetrics server metrics
    * @param nettyConfig configurations for netty library
    * @param tlsConfig TLS/SSL config
-   * @param accessControlFactory access control factory for netty channel
+   * @param accessControl access control for netty channel
    */
   public QueryServer(int port, QueryScheduler queryScheduler, ServerMetrics 
serverMetrics, NettyConfig nettyConfig,
-      TlsConfig tlsConfig,
-      AccessControl accessControl) {
+      TlsConfig tlsConfig, AccessControl accessControl) {
     _port = port;
     _queryScheduler = queryScheduler;
     _serverMetrics = serverMetrics;
@@ -141,13 +133,15 @@ public class QueryServer {
             @Override
             protected void initChannel(SocketChannel ch) {
               if (_tlsConfig != null) {
-                attachSSLHandler(ch);
+                // Add SSL handler first to encrypt and decrypt everything.
+                ch.pipeline()
+                    .addLast(ChannelHandlerFactory.SSL, 
ChannelHandlerFactory.getServerTlsHandler(_tlsConfig, ch));
               }
 
-              ch.pipeline()
-                  .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 
0, Integer.BYTES, 0, Integer.BYTES),
-                      new LengthFieldPrepender(Integer.BYTES),
-                      new InstanceRequestHandler(_queryScheduler, 
_serverMetrics, _accessControl));
+              
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder());
+              
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender());
+              ch.pipeline().addLast(
+                  
ChannelHandlerFactory.getInstanceRequestHandler(_queryScheduler, 
_serverMetrics, _accessControl));
             }
           }).bind(_port).sync().channel();
     } catch (Exception e) {
@@ -158,30 +152,6 @@ public class QueryServer {
     }
   }
 
-  private void attachSSLHandler(SocketChannel ch) {
-    try {
-      if (_tlsConfig.getKeyStorePath() == null) {
-        throw new IllegalArgumentException("Must provide key store path for 
secured server");
-      }
-
-      SslContextBuilder sslContextBuilder = SslContextBuilder
-          .forServer(TlsUtils.createKeyManagerFactory(_tlsConfig))
-          .sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider()));
-
-      if (_tlsConfig.getTrustStorePath() != null) {
-        
sslContextBuilder.trustManager(TlsUtils.createTrustManagerFactory(_tlsConfig));
-      }
-
-      if (_tlsConfig.isClientAuthEnabled()) {
-        sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
-      }
-
-      ch.pipeline().addLast("ssl", 
sslContextBuilder.build().newHandler(ch.alloc()));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   public void shutDown() {
     try {
       _channel.close().sync();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index aeeda17819..869e497486 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -33,10 +33,6 @@ import io.netty.channel.kqueue.KQueueSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -50,7 +46,6 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.utils.TlsUtils;
 import org.apache.pinot.core.util.OsCheck;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -162,38 +157,21 @@ public class ServerChannels {
             @Override
             protected void initChannel(SocketChannel ch) {
               if (_tlsConfig != null) {
-                attachSSLHandler(ch);
+                // Add SSL handler first to encrypt and decrypt everything.
+                ch.pipeline().addLast(
+                    ChannelHandlerFactory.SSL, 
ChannelHandlerFactory.getClientTlsHandler(_tlsConfig, ch));
               }
 
-              ch.pipeline()
-                  .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 
0, Integer.BYTES, 0, Integer.BYTES),
-                      new LengthFieldPrepender(Integer.BYTES),
-                      // NOTE: data table de-serialization happens inside this 
handler
-                      // Revisit if this becomes a bottleneck
-                      new DataTableHandler(_queryRouter, 
_serverRoutingInstance, _brokerMetrics));
+              
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder());
+              
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender());
+              // NOTE: data table de-serialization happens inside this handler
+              // Revisit if this becomes a bottleneck
+              ch.pipeline().addLast(
+                  ChannelHandlerFactory.getDataTableHandler(_queryRouter, 
_serverRoutingInstance, _brokerMetrics));
             }
           });
     }
 
-    void attachSSLHandler(SocketChannel ch) {
-      try {
-        SslContextBuilder sslContextBuilder =
-            
SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider()));
-
-        if (_tlsConfig.getKeyStorePath() != null) {
-          
sslContextBuilder.keyManager(TlsUtils.createKeyManagerFactory(_tlsConfig));
-        }
-
-        if (_tlsConfig.getTrustStorePath() != null) {
-          
sslContextBuilder.trustManager(TlsUtils.createTrustManagerFactory(_tlsConfig));
-        }
-
-        ch.pipeline().addLast("ssl", 
sslContextBuilder.build().newHandler(ch.alloc()));
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
     void sendRequest(String rawTableName, AsyncQueryResponse 
asyncQueryResponse,
         ServerRoutingInstance serverRoutingInstance, byte[] requestBytes, long 
timeoutMs)
         throws InterruptedException, TimeoutException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to