This is an automated email from the ASF dual-hosted git repository. twolf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 3e69f4338d6d6f2ce9ba7de7a81524ff3ad46805 Author: Thomas Wolf <tw...@apache.org> AuthorDate: Fri May 13 20:12:20 2022 +0200 Fix the Netty connection back-end Running the tests with Netty frequently resulted in failures due to connection time-outs. The root cause of this was a race condition in NettyIoConnector: if the connection succeeded and the NettyIoSession was initialized via channelActive() before the connector had set the DefaultIoConnectFuture attribute on the channel, then that future would never be signalled. Rewrite the NettyIoConnector to not use a global Bootstrap for all connections. Use a new and dedicated Bootstrap per connection. This makes is possible to use Bootstrap.attr() to have the channel attribute be set early on, before even the connection is attempted. For the context AttributeRepository, one then doesn't even need an attribute; it's available directly. Also don't set SO_BACKLOG in the connector; this socket option only makes sense for server sockets, i.e., in the acceptor. Enable running the sshd-core tests also with Netty. --- sshd-netty/pom.xml | 9 +++++ .../org/apache/sshd/netty/NettyIoConnector.java | 40 +++++++--------------- .../java/org/apache/sshd/netty/NettyIoService.java | 3 -- .../java/org/apache/sshd/netty/NettyIoSession.java | 13 ++++--- 4 files changed, 31 insertions(+), 34 deletions(-) diff --git a/sshd-netty/pom.xml b/sshd-netty/pom.xml index b21023811..3f4eec573 100644 --- a/sshd-netty/pom.xml +++ b/sshd-netty/pom.xml @@ -70,6 +70,14 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.sshd</groupId> + <artifactId>sshd-core</artifactId> + <version>${project.version}</version> + <classifier>reusable-tests</classifier> + <type>test-jar</type> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.sshd</groupId> <artifactId>sshd-common</artifactId> @@ -154,6 +162,7 @@ <exclude>**/ClientOpenSSHCertificatesTest.java</exclude> <exclude>**/SessionReKeyHostKeyExchangeTest.java</exclude> <!-- reading files from classpath doesn't work correctly w/ reusable test jar --> + <exclude>**/OpenSSHCertificateTest.java</exclude> <exclude>**/OpenSSHCertificateParserTest.java</exclude> <exclude>**/GenerateOpenSSHClientCertificateTest.java</exclude> <exclude>**/GenerateOpenSshClientCertificateOracleTest.java</exclude> diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java index 1e9b954c6..9904e2ffa 100644 --- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java @@ -22,10 +22,8 @@ package org.apache.sshd.netty; import java.net.SocketAddress; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.SocketChannel; @@ -50,15 +48,21 @@ public class NettyIoConnector extends NettyIoService implements IoConnector { // Shared across all connectors private static final LoggingHandler LOGGING_TRACE = new LoggingHandler(NettyIoConnector.class, LogLevel.TRACE); - protected final Bootstrap bootstrap = new Bootstrap(); - public NettyIoConnector(NettyIoServiceFactory factory, IoHandler handler) { super(factory, handler); - channelGroup = new DefaultChannelGroup("sshd-connector-channels", GlobalEventExecutor.INSTANCE); - bootstrap.group(factory.eventLoopGroup) + } + + @Override + public IoConnectFuture connect(SocketAddress address, AttributeRepository context, SocketAddress localAddress) { + if (log.isDebugEnabled()) { + log.debug("Connecting to {}", address); + } + + IoConnectFuture future = new DefaultIoConnectFuture(address, null); + Bootstrap bootstrap = new Bootstrap().group(factory.eventLoopGroup) .channel(NioSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, 100) // TODO make this configurable + .attr(CONNECT_FUTURE_KEY, future) .handler(new ChannelInitializer<SocketChannel>() { @Override @SuppressWarnings("synthetic-access") @@ -66,9 +70,6 @@ public class NettyIoConnector extends NettyIoService implements IoConnector { IoServiceEventListener listener = getIoServiceEventListener(); SocketAddress local = ch.localAddress(); SocketAddress remote = ch.remoteAddress(); - AttributeRepository context = ch.hasAttr(CONTEXT_KEY) - ? ch.attr(CONTEXT_KEY).get() - : null; try { if (listener != null) { try { @@ -105,33 +106,18 @@ public class NettyIoConnector extends NettyIoService implements IoConnector { } } }); - } - @Override - public IoConnectFuture connect(SocketAddress address, AttributeRepository context, SocketAddress localAddress) { - boolean debugEnabled = log.isDebugEnabled(); - if (debugEnabled) { - log.debug("Connecting to {}", address); - } - - IoConnectFuture future = new DefaultIoConnectFuture(address, null); ChannelFuture chf; if (localAddress != null) { chf = bootstrap.connect(address, localAddress); } else { chf = bootstrap.connect(address); } - Channel channel = chf.channel(); - channel.attr(CONNECT_FUTURE_KEY).set(future); - if (context != null) { - channel.attr(CONTEXT_KEY).set(context); - } - chf.addListener(cf -> { - Throwable t = chf.cause(); + Throwable t = cf.cause(); if (t != null) { future.setException(t); - } else if (chf.isCancelled()) { + } else if (cf.isCancelled()) { future.cancel(); } }); diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java index 8e2aaeca5..ebf0245d3 100644 --- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import io.netty.channel.group.ChannelGroup; import io.netty.util.AttributeKey; -import org.apache.sshd.common.AttributeRepository; import org.apache.sshd.common.io.IoConnectFuture; import org.apache.sshd.common.io.IoHandler; import org.apache.sshd.common.io.IoService; @@ -42,8 +41,6 @@ public abstract class NettyIoService extends AbstractCloseable implements IoServ public static final AttributeKey<IoConnectFuture> CONNECT_FUTURE_KEY = AttributeKey.valueOf(IoConnectFuture.class.getName()); - public static final AttributeKey<AttributeRepository> CONTEXT_KEY - = AttributeKey.valueOf(AttributeRepository.class.getName()); protected final AtomicLong sessionSeq = new AtomicLong(); protected final Map<Long, IoSession> sessions = new ConcurrentHashMap<>(); diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java index 143e66191..d52ff0dba 100644 --- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java @@ -212,15 +212,20 @@ public class NettyIoSession extends AbstractCloseable implements IoSession { @Override protected CloseFuture doCloseGracefully() { - context.writeAndFlush(Unpooled.EMPTY_BUFFER) - .addListener(ChannelFutureListener.CLOSE) - .addListener(fut -> closeFuture.setClosed()); + if (context != null) { + context.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE) + .addListener(fut -> closeFuture.setClosed()); + } else { + closeFuture.setClosed(); + } return closeFuture; } @Override protected void doCloseImmediately() { - context.close(); + if (context != null) { + context.close(); + } super.doCloseImmediately(); }