[SSHD-776] SSHD local port forwarding close session unexpectedly

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/0e99597a
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/0e99597a
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/0e99597a

Branch: refs/heads/master
Commit: 0e99597aceb035d271f0140348df23d32b5e3bcd
Parents: 2529a4c
Author: Fulvio Cavarretta <fulvio.cavarre...@primeur.com>
Authored: Wed Oct 11 18:21:11 2017 +0300
Committer: Lyor Goldstein <lyor.goldst...@gmail.com>
Committed: Wed Oct 11 20:35:15 2017 +0300

----------------------------------------------------------------------
 .../sshd/common/channel/AbstractChannel.java    |  1 +
 .../apache/sshd/common/io/nio2/Nio2Session.java | 66 ++++++++++++--------
 .../helpers/AbstractConnectionService.java      | 15 ++++-
 .../sshd/server/forward/TcpipServerChannel.java | 63 ++++++++++---------
 4 files changed, 89 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 2a6c0e9..e1eebef 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -776,6 +776,7 @@ public abstract class AbstractChannel
             throw err;
         }
     }
+
     @Override
     protected void doCloseImmediately() {
         if (service != null) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java 
b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 0401224..a846562 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -133,11 +133,11 @@ public class Nio2Session extends AbstractCloseable 
implements IoSession {
     @Override
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         if (log.isDebugEnabled()) {
-            log.debug("Writing {} bytes", buffer.available());
+            log.debug("writePacket({}) Writing {} bytes", this, 
buffer.available());
         }
 
         ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), 
buffer.available());
-        final Nio2DefaultIoWriteFuture future = new 
Nio2DefaultIoWriteFuture(null, buf);
+        Nio2DefaultIoWriteFuture future = new Nio2DefaultIoWriteFuture(null, 
buf);
         if (isClosing()) {
             Throwable exc = new ClosedChannelException();
             future.setException(exc);
@@ -150,32 +150,33 @@ public class Nio2Session extends AbstractCloseable 
implements IoSession {
     }
 
     protected void exceptionCaught(Throwable exc) {
-        if (!closeFuture.isClosed()) {
-            AsynchronousSocketChannel socket = getSocket();
-            if (isClosing() || !socket.isOpen()) {
-                close(true);
-            } else {
-                IoHandler handler = getIoHandler();
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("exceptionCaught({}) caught {}[{}] - calling 
handler",
-                                  this, exc.getClass().getSimpleName(), 
exc.getMessage());
-                    }
-                    handler.exceptionCaught(this, exc);
-                } catch (Throwable e) {
-                    Throwable t = GenericUtils.peelException(e);
-                    if (log.isDebugEnabled()) {
-                        log.debug("exceptionCaught({}) Exception handler threw 
{}, closing the session: {}",
-                                  this, t.getClass().getSimpleName(), 
t.getMessage());
-                    }
+        if (closeFuture.isClosed()) {
+            return;
+        }
 
-                    if (log.isTraceEnabled()) {
-                        log.trace("exceptionCaught(" + this + ") exception 
handler failure details", t);
-                    }
-                    close(true);
+        AsynchronousSocketChannel socket = getSocket();
+        if (isOpen() && socket.isOpen()) {
+            IoHandler handler = getIoHandler();
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("exceptionCaught({}) caught {}[{}] - calling 
handler",
+                              this, exc.getClass().getSimpleName(), 
exc.getMessage());
+                }
+                handler.exceptionCaught(this, exc);
+            } catch (Throwable e) {
+                Throwable t = GenericUtils.peelException(e);
+                if (log.isDebugEnabled()) {
+                    log.debug("exceptionCaught({}) Exception handler threw {}, 
closing the session: {}",
+                              this, t.getClass().getSimpleName(), 
t.getMessage());
+                }
+
+                if (log.isTraceEnabled()) {
+                    log.trace("exceptionCaught(" + this + ") exception handler 
failure details", t);
                 }
             }
         }
+
+        close(true);
     }
 
     @Override
@@ -193,12 +194,22 @@ public class Nio2Session extends AbstractCloseable 
implements IoSession {
 
     @Override
     protected void doCloseImmediately() {
-        for (;;) {
+        for (boolean debugEnabled = log.isDebugEnabled();;) {
             // Cancel pending requests informing them of the cancellation
             Nio2DefaultIoWriteFuture future = writes.poll();
             if (future != null) {
+                if (future.isWritten()) {
+                    if (debugEnabled) {
+                        log.debug("doCloseImmediately({}) skip already written 
future={}", this, future);
+                    }
+                    continue;
+                }
+
                 Throwable error = future.getException();
                 if (error == null) {
+                    if (debugEnabled) {
+                        log.debug("doCloseImmediately({}) signal write abort 
for future={}", this, future);
+                    }
                     future.setException(new WriteAbortedException("Write 
request aborted due to immediate session close", null));
                 }
             } else {
@@ -379,6 +390,11 @@ public class Nio2Session extends AbstractCloseable 
implements IoSession {
             if (log.isDebugEnabled()) {
                 log.debug("handleCompletedWriteCycle({}) finished writing 
len={}", this, writeLen);
             }
+
+            // This should be called before future.setWritten() to avoid 
WriteAbortedException
+            // to be thrown by doCloseImmediately when called in the listener 
of doCloseGracefully
+            writes.remove(future);
+
             future.setWritten();
             finishWrite(future);
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 1524196..14db9b1 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -307,7 +307,8 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession>
      */
     @Override
     public void unregisterChannel(Channel channel) {
-        Channel result = channels.remove(channel.getId());
+        int channelId = channel.getId();
+        Channel result = channels.remove(channelId);
         if (log.isDebugEnabled()) {
             log.debug("unregisterChannel({}) result={}", channel, result);
         }
@@ -436,8 +437,16 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession>
      */
     public void channelWindowAdjust(Buffer buffer) throws IOException {
         try {
-            Channel channel = getChannel(buffer);
-            channel.handleWindowAdjust(buffer);
+            // Do not use getChannel to avoid the session being closed
+            // if receiving the SSH_MSG_CHANNEL_WINDOW_ADJUST on an already 
closed channel
+            int recipient = buffer.getInt();
+            Channel channel = channels.get(recipient);
+            if (channel != null) {
+                channel.handleWindowAdjust(buffer);
+            } else {
+                log.warn("Received SSH_MSG_CHANNEL_WINDOW_ADJUST on unknown 
channel " + recipient);
+            }
+
         } catch (SshException e) {
             if (log.isDebugEnabled()) {
                 log.debug("channelWindowAdjust {} error: {}", 
e.getClass().getSimpleName(), e.getMessage());

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 57a5699..317e669 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -232,7 +232,7 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
     protected void handleChannelOpenFailure(OpenFuture f, Throwable problem) {
         signalChannelOpenFailure(problem);
         notifyStateChanged(problem.getClass().getSimpleName());
-        closeImmediately0();
+        close(true);
 
         if (problem instanceof ConnectException) {
             f.setException(new SshChannelOpenException(getId(), 
SshConstants.SSH_OPEN_CONNECT_FAILED, problem.getMessage(), problem));
@@ -241,44 +241,43 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
         }
 
     }
-    private void closeImmediately0() {
-        // We need to close the channel immediately to remove it from the
-        // server session's channel table and *not* send a packet to the
-        // client.  A notification was already sent by our caller, or will
-        // be sent after we return.
-        //
-        super.close(true);
+
+    @Override
+    public CloseFuture close(boolean immediately) {
+        CloseFuture closingFeature = super.close(immediately);
 
         // We also need to dispose of the connector, but unfortunately we
         // are being invoked by the connector thread or the connector's
-        // own processor thread.  Disposing of the connector within either
-        // causes deadlock.  Instead create a thread to dispose of the
+        // own processor thread. Disposing of the connector within either
+        // causes deadlock. Instead create a thread to dispose of the
         // connector in the background.
-
         ExecutorService service = getExecutorService();
+
         // allocate a temporary executor service if none provided
-        final ExecutorService executors = (service == null)
+        ExecutorService executors = (service == null)
                 ? 
ThreadUtils.newSingleThreadExecutor("TcpIpServerChannel-ConnectorCleanup[" + 
getSession() + "]")
                 : service;
         // shutdown the temporary executor service if had to create it
-        final boolean shutdown = executors != service || isShutdownOnExit();
-        executors.submit(() -> {
-            try {
-                connector.close(true);
-            } finally {
-                if (shutdown && !executors.isShutdown()) {
-                    Collection<Runnable> runners = executors.shutdownNow();
+        boolean shutdown = (executors != service) || isShutdownOnExit();
+
+        return builder().when(closingFeature).run(() -> {
+            executors.submit(() -> {
+                try {
                     if (log.isDebugEnabled()) {
-                        log.debug("destroy({}) - shutdown executor service - 
runners count={}", TcpipServerChannel.this, runners.size());
+                        log.debug("disposing connector: {} for: {}", 
connector, TcpipServerChannel.this);
+                    }
+                    connector.close(immediately);
+                } finally {
+                    if (shutdown && (!executors.isShutdown())) {
+                        Collection<Runnable> runners = executors.shutdownNow();
+                        if (log.isDebugEnabled()) {
+                            log.debug("destroy({}) - shutdown executor service 
- runners count={}",
+                                      TcpipServerChannel.this, runners.size());
+                        }
                     }
                 }
-            }
-        });
-    }
-
-    @Override
-    public CloseFuture close(boolean immediately) {
-        return super.close(immediately).addListener(sshFuture -> 
closeImmediately0());
+            });
+        }).build().close(false);
     }
 
     @Override
@@ -328,6 +327,14 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
                     + " len=" + len + " write failure details", t);
         }
 
-        session.exceptionCaught(t);
+        if (ioSession.isOpen()) {
+            session.exceptionCaught(t);
+        } else {
+            // In case remote entity has closed the socket (the ioSession), 
data coming from
+            // the SSH channel should be simply discarded
+            if (log.isDebugEnabled()) {
+                log.debug("Ignoring writeDataFailure {} because ioSession {} 
is already closing ", t, ioSession);
+            }
+        }
     }
 }

Reply via email to