This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9a49ab2ded Enhance event loop initialization (#9094) 9a49ab2ded is described below commit 9a49ab2dedb4d9157af99cd42403f4dad56b2b90 Author: Jia Guo <jia...@linkedin.com> AuthorDate: Mon Jul 25 11:08:53 2022 -0700 Enhance event loop initialization (#9094) * Enhance event loop initialization --- .../apache/pinot/core/transport/QueryServer.java | 35 +++++++++++++++++----- .../pinot/core/transport/ServerChannels.java | 28 ++++++++++++++--- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java index f2b504a89f..d0acbc6000 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java @@ -23,8 +23,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.kqueue.KQueue; import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.kqueue.KQueueServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; @@ -45,6 +47,8 @@ import org.apache.pinot.core.query.scheduler.QueryScheduler; import org.apache.pinot.core.util.OsCheck; import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.server.access.AllowAllAccessFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -52,15 +56,16 @@ import org.apache.pinot.server.access.AllowAllAccessFactory; * Brokers. */ public class QueryServer { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryServer.class); private final int _port; private final QueryScheduler _queryScheduler; private final ServerMetrics _serverMetrics; private final TlsConfig _tlsConfig; private final AccessControl _accessControl; - private EventLoopGroup _bossGroup; - private EventLoopGroup _workerGroup; - private Class<? extends ServerSocketChannel> _channelClass; + private final EventLoopGroup _bossGroup; + private final EventLoopGroup _workerGroup; + private final Class<? extends ServerSocketChannel> _channelClass; private Channel _channel; @@ -94,20 +99,36 @@ public class QueryServer { _serverMetrics = serverMetrics; _tlsConfig = tlsConfig; _accessControl = accessControl; - if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled() - && OsCheck.getOperatingSystemType() == OsCheck.OSType.Linux) { + + boolean enableNativeTransports = nettyConfig != null && nettyConfig.isNativeTransportsEnabled(); + OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType(); + if (enableNativeTransports + && operatingSystemType == OsCheck.OSType.Linux + && Epoll.isAvailable()) { _bossGroup = new EpollEventLoopGroup(); _workerGroup = new EpollEventLoopGroup(); _channelClass = EpollServerSocketChannel.class; - } else if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled() - && OsCheck.getOperatingSystemType() == OsCheck.OSType.MacOS) { + LOGGER.info("Using Epoll event loop"); + } else if (enableNativeTransports + && operatingSystemType == OsCheck.OSType.MacOS + && KQueue.isAvailable()) { _bossGroup = new KQueueEventLoopGroup(); _workerGroup = new KQueueEventLoopGroup(); _channelClass = KQueueServerSocketChannel.class; + LOGGER.info("Using KQueue event loop"); } else { _bossGroup = new NioEventLoopGroup(); _workerGroup = new NioEventLoopGroup(); _channelClass = NioServerSocketChannel.class; + StringBuilder log = new StringBuilder("Using NIO event loop"); + if (operatingSystemType == OsCheck.OSType.Linux + && enableNativeTransports) { + log.append(", as Epoll is not available: ").append(Epoll.unavailabilityCause()); + } else if (operatingSystemType == OsCheck.OSType.MacOS + && enableNativeTransports) { + log.append(", as KQueue is not available: ").append(KQueue.unavailabilityCause()); + } + LOGGER.info(log.toString()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java index 8b76b8ed22..aeeda17819 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java @@ -24,8 +24,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.kqueue.KQueue; import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; @@ -53,6 +55,8 @@ import org.apache.pinot.core.util.OsCheck; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -62,6 +66,7 @@ import org.apache.thrift.transport.TTransportException; */ @ThreadSafe public class ServerChannels { + private static final Logger LOGGER = LoggerFactory.getLogger(ServerChannels.class); public static final String CHANNEL_LOCK_TIMEOUT_MSG = "Timeout while acquiring channel lock"; private static final long TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS = 5_000L; @@ -83,17 +88,32 @@ public class ServerChannels { */ public ServerChannels(QueryRouter queryRouter, BrokerMetrics brokerMetrics, @Nullable NettyConfig nettyConfig, @Nullable TlsConfig tlsConfig) { - if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled() - && OsCheck.getOperatingSystemType() == OsCheck.OSType.Linux) { + boolean enableNativeTransports = nettyConfig != null && nettyConfig.isNativeTransportsEnabled(); + OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType(); + if (enableNativeTransports + && operatingSystemType == OsCheck.OSType.Linux + && Epoll.isAvailable()) { _eventLoopGroup = new EpollEventLoopGroup(); _channelClass = EpollSocketChannel.class; - } else if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled() - && OsCheck.getOperatingSystemType() == OsCheck.OSType.MacOS) { + LOGGER.info("Using Epoll event loop"); + } else if (enableNativeTransports + && operatingSystemType == OsCheck.OSType.MacOS + && KQueue.isAvailable()) { _eventLoopGroup = new KQueueEventLoopGroup(); _channelClass = KQueueSocketChannel.class; + LOGGER.info("Using KQueue event loop"); } else { _eventLoopGroup = new NioEventLoopGroup(); _channelClass = NioSocketChannel.class; + StringBuilder log = new StringBuilder("Using NIO event loop"); + if (operatingSystemType == OsCheck.OSType.Linux + && enableNativeTransports) { + log.append(", as Epoll is not available: ").append(Epoll.unavailabilityCause()); + } else if (operatingSystemType == OsCheck.OSType.MacOS + && enableNativeTransports) { + log.append(", as KQueue is not available: ").append(KQueue.unavailabilityCause()); + } + LOGGER.info(log.toString()); } _queryRouter = queryRouter; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org