[SSHD-776] SSHD local port forwarding close session unexpectedly
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/0e99597a Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/0e99597a Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/0e99597a Branch: refs/heads/master Commit: 0e99597aceb035d271f0140348df23d32b5e3bcd Parents: 2529a4c Author: Fulvio Cavarretta <fulvio.cavarre...@primeur.com> Authored: Wed Oct 11 18:21:11 2017 +0300 Committer: Lyor Goldstein <lyor.goldst...@gmail.com> Committed: Wed Oct 11 20:35:15 2017 +0300 ---------------------------------------------------------------------- .../sshd/common/channel/AbstractChannel.java | 1 + .../apache/sshd/common/io/nio2/Nio2Session.java | 66 ++++++++++++-------- .../helpers/AbstractConnectionService.java | 15 ++++- .../sshd/server/forward/TcpipServerChannel.java | 63 ++++++++++--------- 4 files changed, 89 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index 2a6c0e9..e1eebef 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -776,6 +776,7 @@ public abstract class AbstractChannel throw err; } } + @Override protected void doCloseImmediately() { if (service != null) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index 0401224..a846562 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java @@ -133,11 +133,11 @@ public class Nio2Session extends AbstractCloseable implements IoSession { @Override public IoWriteFuture writePacket(Buffer buffer) throws IOException { if (log.isDebugEnabled()) { - log.debug("Writing {} bytes", buffer.available()); + log.debug("writePacket({}) Writing {} bytes", this, buffer.available()); } ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available()); - final Nio2DefaultIoWriteFuture future = new Nio2DefaultIoWriteFuture(null, buf); + Nio2DefaultIoWriteFuture future = new Nio2DefaultIoWriteFuture(null, buf); if (isClosing()) { Throwable exc = new ClosedChannelException(); future.setException(exc); @@ -150,32 +150,33 @@ public class Nio2Session extends AbstractCloseable implements IoSession { } protected void exceptionCaught(Throwable exc) { - if (!closeFuture.isClosed()) { - AsynchronousSocketChannel socket = getSocket(); - if (isClosing() || !socket.isOpen()) { - close(true); - } else { - IoHandler handler = getIoHandler(); - try { - if (log.isDebugEnabled()) { - log.debug("exceptionCaught({}) caught {}[{}] - calling handler", - this, exc.getClass().getSimpleName(), exc.getMessage()); - } - handler.exceptionCaught(this, exc); - } catch (Throwable e) { - Throwable t = GenericUtils.peelException(e); - if (log.isDebugEnabled()) { - log.debug("exceptionCaught({}) Exception handler threw {}, closing the session: {}", - this, t.getClass().getSimpleName(), t.getMessage()); - } + if (closeFuture.isClosed()) { + return; + } - if (log.isTraceEnabled()) { - log.trace("exceptionCaught(" + this + ") exception handler failure details", t); - } - close(true); + AsynchronousSocketChannel socket = getSocket(); + if (isOpen() && socket.isOpen()) { + IoHandler handler = getIoHandler(); + try { + if (log.isDebugEnabled()) { + log.debug("exceptionCaught({}) caught {}[{}] - calling handler", + this, exc.getClass().getSimpleName(), exc.getMessage()); + } + handler.exceptionCaught(this, exc); + } catch (Throwable e) { + Throwable t = GenericUtils.peelException(e); + if (log.isDebugEnabled()) { + log.debug("exceptionCaught({}) Exception handler threw {}, closing the session: {}", + this, t.getClass().getSimpleName(), t.getMessage()); + } + + if (log.isTraceEnabled()) { + log.trace("exceptionCaught(" + this + ") exception handler failure details", t); } } } + + close(true); } @Override @@ -193,12 +194,22 @@ public class Nio2Session extends AbstractCloseable implements IoSession { @Override protected void doCloseImmediately() { - for (;;) { + for (boolean debugEnabled = log.isDebugEnabled();;) { // Cancel pending requests informing them of the cancellation Nio2DefaultIoWriteFuture future = writes.poll(); if (future != null) { + if (future.isWritten()) { + if (debugEnabled) { + log.debug("doCloseImmediately({}) skip already written future={}", this, future); + } + continue; + } + Throwable error = future.getException(); if (error == null) { + if (debugEnabled) { + log.debug("doCloseImmediately({}) signal write abort for future={}", this, future); + } future.setException(new WriteAbortedException("Write request aborted due to immediate session close", null)); } } else { @@ -379,6 +390,11 @@ public class Nio2Session extends AbstractCloseable implements IoSession { if (log.isDebugEnabled()) { log.debug("handleCompletedWriteCycle({}) finished writing len={}", this, writeLen); } + + // This should be called before future.setWritten() to avoid WriteAbortedException + // to be thrown by doCloseImmediately when called in the listener of doCloseGracefully + writes.remove(future); + future.setWritten(); finishWrite(future); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java index 1524196..14db9b1 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java @@ -307,7 +307,8 @@ public abstract class AbstractConnectionService<S extends AbstractSession> */ @Override public void unregisterChannel(Channel channel) { - Channel result = channels.remove(channel.getId()); + int channelId = channel.getId(); + Channel result = channels.remove(channelId); if (log.isDebugEnabled()) { log.debug("unregisterChannel({}) result={}", channel, result); } @@ -436,8 +437,16 @@ public abstract class AbstractConnectionService<S extends AbstractSession> */ public void channelWindowAdjust(Buffer buffer) throws IOException { try { - Channel channel = getChannel(buffer); - channel.handleWindowAdjust(buffer); + // Do not use getChannel to avoid the session being closed + // if receiving the SSH_MSG_CHANNEL_WINDOW_ADJUST on an already closed channel + int recipient = buffer.getInt(); + Channel channel = channels.get(recipient); + if (channel != null) { + channel.handleWindowAdjust(buffer); + } else { + log.warn("Received SSH_MSG_CHANNEL_WINDOW_ADJUST on unknown channel " + recipient); + } + } catch (SshException e) { if (log.isDebugEnabled()) { log.debug("channelWindowAdjust {} error: {}", e.getClass().getSimpleName(), e.getMessage()); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index 57a5699..317e669 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -232,7 +232,7 @@ public class TcpipServerChannel extends AbstractServerChannel { protected void handleChannelOpenFailure(OpenFuture f, Throwable problem) { signalChannelOpenFailure(problem); notifyStateChanged(problem.getClass().getSimpleName()); - closeImmediately0(); + close(true); if (problem instanceof ConnectException) { f.setException(new SshChannelOpenException(getId(), SshConstants.SSH_OPEN_CONNECT_FAILED, problem.getMessage(), problem)); @@ -241,44 +241,43 @@ public class TcpipServerChannel extends AbstractServerChannel { } } - private void closeImmediately0() { - // We need to close the channel immediately to remove it from the - // server session's channel table and *not* send a packet to the - // client. A notification was already sent by our caller, or will - // be sent after we return. - // - super.close(true); + + @Override + public CloseFuture close(boolean immediately) { + CloseFuture closingFeature = super.close(immediately); // We also need to dispose of the connector, but unfortunately we // are being invoked by the connector thread or the connector's - // own processor thread. Disposing of the connector within either - // causes deadlock. Instead create a thread to dispose of the + // own processor thread. Disposing of the connector within either + // causes deadlock. Instead create a thread to dispose of the // connector in the background. - ExecutorService service = getExecutorService(); + // allocate a temporary executor service if none provided - final ExecutorService executors = (service == null) + ExecutorService executors = (service == null) ? ThreadUtils.newSingleThreadExecutor("TcpIpServerChannel-ConnectorCleanup[" + getSession() + "]") : service; // shutdown the temporary executor service if had to create it - final boolean shutdown = executors != service || isShutdownOnExit(); - executors.submit(() -> { - try { - connector.close(true); - } finally { - if (shutdown && !executors.isShutdown()) { - Collection<Runnable> runners = executors.shutdownNow(); + boolean shutdown = (executors != service) || isShutdownOnExit(); + + return builder().when(closingFeature).run(() -> { + executors.submit(() -> { + try { if (log.isDebugEnabled()) { - log.debug("destroy({}) - shutdown executor service - runners count={}", TcpipServerChannel.this, runners.size()); + log.debug("disposing connector: {} for: {}", connector, TcpipServerChannel.this); + } + connector.close(immediately); + } finally { + if (shutdown && (!executors.isShutdown())) { + Collection<Runnable> runners = executors.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("destroy({}) - shutdown executor service - runners count={}", + TcpipServerChannel.this, runners.size()); + } } } - } - }); - } - - @Override - public CloseFuture close(boolean immediately) { - return super.close(immediately).addListener(sshFuture -> closeImmediately0()); + }); + }).build().close(false); } @Override @@ -328,6 +327,14 @@ public class TcpipServerChannel extends AbstractServerChannel { + " len=" + len + " write failure details", t); } - session.exceptionCaught(t); + if (ioSession.isOpen()) { + session.exceptionCaught(t); + } else { + // In case remote entity has closed the socket (the ioSession), data coming from + // the SSH channel should be simply discarded + if (log.isDebugEnabled()) { + log.debug("Ignoring writeDataFailure {} because ioSession {} is already closing ", t, ioSession); + } + } } }