This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit a3bca7b22eaddb54b51c9b1b5ca264dac10b7903
Author: Lyor Goldstein <lgoldst...@apache.org>
AuthorDate: Wed Jan 22 18:59:26 2020 +0200

    [SSHD-964] Allow sending SSH_MSG_CHANNEL_EOF while channel is being closed 
gracefully
---
 CHANGES.md                                         |  2 +
 .../common/util/closeable/AbstractCloseable.java   | 16 +++++--
 .../sshd/client/channel/AbstractClientChannel.java |  6 ++-
 .../sshd/common/channel/AbstractChannel.java       | 50 ++++++++++++++--------
 .../common/channel/ChannelAsyncOutputStream.java   |  2 +-
 .../common/forward/DefaultForwardingFilter.java    | 14 ++----
 .../org/apache/sshd/common/forward/SocksProxy.java |  4 +-
 .../common/session/helpers/AbstractSession.java    |  4 +-
 8 files changed, 60 insertions(+), 38 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5c5e872..dd584ff 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,3 +11,5 @@
 ## Minor code helpers
 
 ## Behavioral changes and enhancements
+
+* [SSHD-964](https://issues.apache.org/jira/browse/SSHD-964) - Send 
SSH_MSG_CHANNEL_EOF when tunnel channel being closed.
diff --git 
a/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
 
b/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
index e503431..46eae38 100644
--- 
a/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
+++ 
b/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
@@ -26,14 +26,22 @@ import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 
 /**
- * Provides some default implementations
+ * Provides some default implementations for managing channel/connection 
open/close state
  *
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
 public abstract class AbstractCloseable extends IoBaseCloseable {
 
     public enum State {
-        Opened, Graceful, Immediate, Closed
+        /** Connection is open */
+        Opened,
+        /** Connection is being closed gracefully */
+        Graceful,
+        /** Connection is being terminated immediately */
+        Immediate,
+        /** Connection is closed */
+        Closed,
+        /* end */;
     }
 
     /**
@@ -90,7 +98,7 @@ public abstract class AbstractCloseable extends 
IoBaseCloseable {
                 }
             } else {
                 if (debugEnabled) {
-                    log.debug("close({})[Immediately] state already {}", this, 
state.get());
+                    log.debug("close({})[Immediately] state already {}", this, 
state);
                 }
             }
         } else {
@@ -119,7 +127,7 @@ public abstract class AbstractCloseable extends 
IoBaseCloseable {
                 }
             } else {
                 if (debugEnabled) {
-                    log.debug("close({})[Graceful] state already {}", this, 
state.get());
+                    log.debug("close({})[Graceful] state already {}", this, 
state);
                 }
             }
         }
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 93832c6..3a51dda 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -318,7 +318,7 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
     @Override
     public synchronized OpenFuture open() throws IOException {
         if (isClosing()) {
-            throw new SshException("Session has been closed");
+            throw new SshException("Session has been closed: " + state);
         }
 
         openFuture = new DefaultOpenFuture(this.toString(), futureLock);
@@ -399,6 +399,10 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
     protected void doWriteData(byte[] data, int off, long len) throws 
IOException {
         // If we're already closing, ignore incoming data
         if (isClosing()) {
+            if (log.isDebugEnabled()) {
+                log.debug("doWriteData({}) ignored (len={}) channel state={}", 
this, len, state);
+            }
+
             return;
         }
         ValidateUtils.checkTrue(
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index a5b788e..d8ebe09 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -780,19 +780,19 @@ public abstract class AbstractChannel
 
     @Override
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
-        Session s = getSession();
         if (!isClosing()) {
+            Session s = getSession();
             return s.writePacket(buffer);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("writePacket({}) Discarding output packet because 
channel is being closed", this);
-            }
-            return new AbstractIoWriteFuture(s.toString(), null) {
-                {
-                    setValue(new EOFException("Channel is being closed"));
-                }
-            };
         }
+
+        if (log.isDebugEnabled()) {
+            log.debug("writePacket({}) Discarding output packet because 
channel state={}", this, state);
+        }
+        return new AbstractIoWriteFuture(toString(), null) {
+            {
+                setValue(new EOFException("Channel is being closed"));
+            }
+        };
     }
 
     @Override
@@ -921,29 +921,43 @@ public abstract class AbstractChannel
 
     protected abstract void doWriteExtendedData(byte[] data, int off, long 
len) throws IOException;
 
-    protected void sendEof() throws IOException {
-        if (isClosing()) {
+    /**
+     * Sends {@code SSH_MSG_CHANNEL_EOF} provided not already sent
+     * and current channel state allows it.
+     *
+     * @return The {@link IoWriteFuture} of the sent packet - {@code null}
+     * if message not sent due to channel state (or already sent)
+     * @throws IOException If failed to send the packet
+     */
+    protected IoWriteFuture sendEof() throws IOException {
+        State channelState = state.get();
+        // OK to send EOF if channel is open or being closed gracefully
+        if ((channelState != State.Opened) && (channelState != 
State.Graceful)) {
             if (log.isDebugEnabled()) {
-                log.debug("sendEof({}) already closing or closed", this);
+                log.debug("sendEof({}) already closing or closed - state={}", 
this, state);
             }
-            return;
+            return null;
         }
 
         if (eofSent.getAndSet(true)) {
             if (log.isDebugEnabled()) {
-                log.debug("sendEof({}) already sent", this);
+                log.debug("sendEof({}) already sent (state={})", this, 
channelState);
             }
-            return;
+            return null;
         }
 
         if (log.isDebugEnabled()) {
-            log.debug("sendEof({}) SSH_MSG_CHANNEL_EOF", this);
+            log.debug("sendEof({}) SSH_MSG_CHANNEL_EOF (state={})", this, 
channelState);
         }
 
         Session s = getSession();
         Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_EOF, 
Short.SIZE);
         buffer.putInt(getRecipient());
-        writePacket(buffer);
+        /*
+         * The default "writePacket" does not send packets if state
+         * is not open so we need to bypass it.
+         */
+        return s.writePacket(buffer);
     }
 
     public boolean isEofSent() {
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index b0e715e..e2f1ebd 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -60,7 +60,7 @@ public class ChannelAsyncOutputStream extends 
AbstractCloseable implements IoOut
     @Override
     public synchronized IoWriteFuture writePacket(Buffer buffer) throws 
IOException {
         if (isClosing()) {
-            throw new EOFException("Closed");
+            throw new EOFException("Closing: " + state);
         }
 
         IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, 
buffer);
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
index 8836ece..7673920 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
@@ -192,11 +192,8 @@ public class DefaultForwardingFilter
         ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: 
%s", local);
         Objects.requireNonNull(remote, "Remote address is null");
 
-        if (isClosed()) {
-            throw new IllegalStateException("TcpipForwarder is closed");
-        }
-        if (isClosing()) {
-            throw new IllegalStateException("TcpipForwarder is closing");
+        if (isClosed() || isClosing()) {
+            throw new IllegalStateException("TcpipForwarder is closed or 
closing: " + state);
         }
 
         InetSocketAddress bound = null;
@@ -453,11 +450,8 @@ public class DefaultForwardingFilter
         Objects.requireNonNull(local, "Local address is null");
         ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: 
%s", local);
 
-        if (isClosed()) {
-            throw new IllegalStateException("TcpipForwarder is closed");
-        }
-        if (isClosing()) {
-            throw new IllegalStateException("TcpipForwarder is closing");
+        if (isClosed() || isClosing()) {
+            throw new IllegalStateException("TcpipForwarder is closed or 
closing: " + state);
         }
 
         SocksProxy proxy = null;
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
index c59cef2..77bb724 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -52,7 +52,7 @@ public class SocksProxy extends AbstractCloseable implements 
IoHandler {
     @Override
     public void sessionCreated(IoSession session) throws Exception {
         if (isClosing()) {
-            throw new SshException("SocksProxy is closing or closed");
+            throw new SshException("SocksProxy is closing or closed: " + 
state);
         }
     }
 
@@ -65,7 +65,7 @@ public class SocksProxy extends AbstractCloseable implements 
IoHandler {
     }
 
     @Override
-    public void messageReceived(final IoSession session, 
org.apache.sshd.common.util.Readable message) throws Exception {
+    public void messageReceived(IoSession session, 
org.apache.sshd.common.util.Readable message) throws Exception {
         Buffer buffer = new ByteArrayBuffer(message.available() + Long.SIZE, 
false);
         buffer.putBuffer(message);
         Proxy proxy = proxies.get(session);
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 54d9929..4468276 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -1942,7 +1942,7 @@ public abstract class AbstractSession extends 
SessionHelper {
         }
 
         return ValidateUtils.checkNotNull(
-            kexFutureHolder.get(), "No current KEX future on state=%s", 
kexState.get());
+            kexFutureHolder.get(), "No current KEX future on state=%s", 
kexState);
     }
 
     /**
@@ -1970,7 +1970,7 @@ public abstract class AbstractSession extends 
SessionHelper {
     protected KeyExchangeFuture requestNewKeysExchange() throws IOException, 
GeneralSecurityException {
         if (!kexState.compareAndSet(KexState.DONE, KexState.INIT)) {
             if (log.isDebugEnabled()) {
-                log.debug("requestNewKeysExchange({}) KEX state not DONE: {}", 
this, kexState.get());
+                log.debug("requestNewKeysExchange({}) KEX state not DONE: {}", 
this, kexState);
             }
 
             return null;

Reply via email to