This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d0426e9864 Put Netty Channel handlers and Tls context creation logic at same place (#8163) d0426e9864 is described below commit d0426e9864a1ab6b7b8407603f19b7a30503e46a Author: Liang Mingqiang <secret.mqli...@gmail.com> AuthorDate: Mon Aug 1 17:45:42 2022 -0700 Put Netty Channel handlers and Tls context creation logic at same place (#8163) --- .../org/apache/pinot/common/utils/TlsUtils.java | 49 ++++++++++++ .../core/transport/ChannelHandlerFactory.java | 89 ++++++++++++++++++++++ .../apache/pinot/core/transport/QueryServer.java | 48 +++--------- .../pinot/core/transport/ServerChannels.java | 40 +++------- 4 files changed, 156 insertions(+), 70 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java index 404d03a127..0c444c2d68 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TlsUtils.java @@ -19,6 +19,10 @@ package org.apache.pinot.common.utils; import com.google.common.base.Preconditions; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; @@ -339,4 +343,49 @@ public final class TlsUtils { return _sslSocketFactory.createSocket(host, port); } } + + /** + * Builds client side SslContext based on a given TlsConfig. + * + * @param tlsConfig TLS config + */ + public static SslContext buildClientContext(TlsConfig tlsConfig) { + SslContextBuilder sslContextBuilder = + SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(tlsConfig.getSslProvider())); + if (tlsConfig.getKeyStorePath() != null) { + sslContextBuilder.keyManager(createKeyManagerFactory(tlsConfig)); + } + if (tlsConfig.getTrustStorePath() != null) { + sslContextBuilder.trustManager(createTrustManagerFactory(tlsConfig)); + } + try { + return sslContextBuilder.build(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Builds server side SslContext based on a given TlsConfig. + * + * @param tlsConfig TLS config + */ + public static SslContext buildServerContext(TlsConfig tlsConfig) { + if (tlsConfig.getKeyStorePath() == null) { + throw new IllegalArgumentException("Must provide key store path for secured server"); + } + SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(createKeyManagerFactory(tlsConfig)) + .sslProvider(SslProvider.valueOf(tlsConfig.getSslProvider())); + if (tlsConfig.getTrustStorePath() != null) { + sslContextBuilder.trustManager(createTrustManagerFactory(tlsConfig)); + } + if (tlsConfig.isClientAuthEnabled()) { + sslContextBuilder.clientAuth(ClientAuth.REQUIRE); + } + try { + return sslContextBuilder.build(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java new file mode 100644 index 0000000000..c62c266af5 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.transport; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import org.apache.pinot.common.config.TlsConfig; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.TlsUtils; +import org.apache.pinot.core.query.scheduler.QueryScheduler; +import org.apache.pinot.server.access.AccessControl; + + +/** + * The {@code ChannelHandlerFactory} provides all kinds of Netty ChannelHandlers + */ +public class ChannelHandlerFactory { + + public static final String SSL = "ssl"; + + private ChannelHandlerFactory() { + } + + /** + * The {@code getLengthFieldBasedFrameDecoder} return a decoder ChannelHandler that splits the received ByteBuffers + * dynamically by the value of the length field in the message + */ + public static ChannelHandler getLengthFieldBasedFrameDecoder() { + return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, Integer.BYTES, 0, Integer.BYTES); + } + + /** + * The {@code getLengthFieldPrepender} return an encoder ChannelHandler that prepends the length of the message. + */ + public static ChannelHandler getLengthFieldPrepender() { + return new LengthFieldPrepender(Integer.BYTES); + } + + /** + * The {@code getClientTlsHandler} return a Client side Tls handler that encrypt and decrypt everything. + */ + public static ChannelHandler getClientTlsHandler(TlsConfig tlsConfig, SocketChannel ch) { + return TlsUtils.buildClientContext(tlsConfig).newHandler(ch.alloc()); + } + + /** + * The {@code getServerTlsHandler} return a Server side Tls handler that encrypt and decrypt everything. + */ + public static ChannelHandler getServerTlsHandler(TlsConfig tlsConfig, SocketChannel ch) { + return TlsUtils.buildServerContext(tlsConfig).newHandler(ch.alloc()); + } + + /** + * The {@code getDataTableHandler} return a {@code DataTableHandler} Netty inbound handler on Pinot Broker side to + * handle the serialized data table responses sent from Pinot Server. + */ + public static ChannelHandler getDataTableHandler(QueryRouter queryRouter, ServerRoutingInstance serverRoutingInstance, + BrokerMetrics brokerMetrics) { + return new DataTableHandler(queryRouter, serverRoutingInstance, brokerMetrics); + } + + /** + * The {@code getInstanceRequestHandler} return a {@code InstanceRequestHandler} Netty inbound handler on Pinot + * Server side to handle the serialized instance requests sent from Pinot Broker. + */ + public static ChannelHandler getInstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serverMetrics, + AccessControl accessControl) { + return new InstanceRequestHandler(queryScheduler, serverMetrics, accessControl); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java index d0acbc6000..beb64e58f9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java @@ -33,16 +33,10 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.LengthFieldPrepender; -import io.netty.handler.ssl.ClientAuth; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslProvider; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.config.NettyConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.common.utils.TlsUtils; import org.apache.pinot.core.query.scheduler.QueryScheduler; import org.apache.pinot.core.util.OsCheck; import org.apache.pinot.server.access.AccessControl; @@ -68,7 +62,6 @@ public class QueryServer { private final Class<? extends ServerSocketChannel> _channelClass; private Channel _channel; - /** * Create an unsecured server instance * @@ -89,11 +82,10 @@ public class QueryServer { * @param serverMetrics server metrics * @param nettyConfig configurations for netty library * @param tlsConfig TLS/SSL config - * @param accessControlFactory access control factory for netty channel + * @param accessControl access control for netty channel */ public QueryServer(int port, QueryScheduler queryScheduler, ServerMetrics serverMetrics, NettyConfig nettyConfig, - TlsConfig tlsConfig, - AccessControl accessControl) { + TlsConfig tlsConfig, AccessControl accessControl) { _port = port; _queryScheduler = queryScheduler; _serverMetrics = serverMetrics; @@ -141,13 +133,15 @@ public class QueryServer { @Override protected void initChannel(SocketChannel ch) { if (_tlsConfig != null) { - attachSSLHandler(ch); + // Add SSL handler first to encrypt and decrypt everything. + ch.pipeline() + .addLast(ChannelHandlerFactory.SSL, ChannelHandlerFactory.getServerTlsHandler(_tlsConfig, ch)); } - ch.pipeline() - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, Integer.BYTES, 0, Integer.BYTES), - new LengthFieldPrepender(Integer.BYTES), - new InstanceRequestHandler(_queryScheduler, _serverMetrics, _accessControl)); + ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder()); + ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender()); + ch.pipeline().addLast( + ChannelHandlerFactory.getInstanceRequestHandler(_queryScheduler, _serverMetrics, _accessControl)); } }).bind(_port).sync().channel(); } catch (Exception e) { @@ -158,30 +152,6 @@ public class QueryServer { } } - private void attachSSLHandler(SocketChannel ch) { - try { - if (_tlsConfig.getKeyStorePath() == null) { - throw new IllegalArgumentException("Must provide key store path for secured server"); - } - - SslContextBuilder sslContextBuilder = SslContextBuilder - .forServer(TlsUtils.createKeyManagerFactory(_tlsConfig)) - .sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider())); - - if (_tlsConfig.getTrustStorePath() != null) { - sslContextBuilder.trustManager(TlsUtils.createTrustManagerFactory(_tlsConfig)); - } - - if (_tlsConfig.isClientAuthEnabled()) { - sslContextBuilder.clientAuth(ClientAuth.REQUIRE); - } - - ch.pipeline().addLast("ssl", sslContextBuilder.build().newHandler(ch.alloc())); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - public void shutDown() { try { _channel.close().sync(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java index aeeda17819..869e497486 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java @@ -33,10 +33,6 @@ import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.LengthFieldPrepender; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslProvider; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,7 +46,6 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.BrokerTimer; import org.apache.pinot.common.request.InstanceRequest; -import org.apache.pinot.common.utils.TlsUtils; import org.apache.pinot.core.util.OsCheck; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; @@ -162,38 +157,21 @@ public class ServerChannels { @Override protected void initChannel(SocketChannel ch) { if (_tlsConfig != null) { - attachSSLHandler(ch); + // Add SSL handler first to encrypt and decrypt everything. + ch.pipeline().addLast( + ChannelHandlerFactory.SSL, ChannelHandlerFactory.getClientTlsHandler(_tlsConfig, ch)); } - ch.pipeline() - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, Integer.BYTES, 0, Integer.BYTES), - new LengthFieldPrepender(Integer.BYTES), - // NOTE: data table de-serialization happens inside this handler - // Revisit if this becomes a bottleneck - new DataTableHandler(_queryRouter, _serverRoutingInstance, _brokerMetrics)); + ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder()); + ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender()); + // NOTE: data table de-serialization happens inside this handler + // Revisit if this becomes a bottleneck + ch.pipeline().addLast( + ChannelHandlerFactory.getDataTableHandler(_queryRouter, _serverRoutingInstance, _brokerMetrics)); } }); } - void attachSSLHandler(SocketChannel ch) { - try { - SslContextBuilder sslContextBuilder = - SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider())); - - if (_tlsConfig.getKeyStorePath() != null) { - sslContextBuilder.keyManager(TlsUtils.createKeyManagerFactory(_tlsConfig)); - } - - if (_tlsConfig.getTrustStorePath() != null) { - sslContextBuilder.trustManager(TlsUtils.createTrustManagerFactory(_tlsConfig)); - } - - ch.pipeline().addLast("ssl", sslContextBuilder.build().newHandler(ch.alloc())); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse, ServerRoutingInstance serverRoutingInstance, byte[] requestBytes, long timeoutMs) throws InterruptedException, TimeoutException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org