This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit a3bca7b22eaddb54b51c9b1b5ca264dac10b7903 Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Wed Jan 22 18:59:26 2020 +0200 [SSHD-964] Allow sending SSH_MSG_CHANNEL_EOF while channel is being closed gracefully --- CHANGES.md | 2 + .../common/util/closeable/AbstractCloseable.java | 16 +++++-- .../sshd/client/channel/AbstractClientChannel.java | 6 ++- .../sshd/common/channel/AbstractChannel.java | 50 ++++++++++++++-------- .../common/channel/ChannelAsyncOutputStream.java | 2 +- .../common/forward/DefaultForwardingFilter.java | 14 ++---- .../org/apache/sshd/common/forward/SocksProxy.java | 4 +- .../common/session/helpers/AbstractSession.java | 4 +- 8 files changed, 60 insertions(+), 38 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 5c5e872..dd584ff 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,3 +11,5 @@ ## Minor code helpers ## Behavioral changes and enhancements + +* [SSHD-964](https://issues.apache.org/jira/browse/SSHD-964) - Send SSH_MSG_CHANNEL_EOF when tunnel channel being closed. diff --git a/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java b/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java index e503431..46eae38 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java @@ -26,14 +26,22 @@ import org.apache.sshd.common.future.SshFuture; import org.apache.sshd.common.future.SshFutureListener; /** - * Provides some default implementations + * Provides some default implementations for managing channel/connection open/close state * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public abstract class AbstractCloseable extends IoBaseCloseable { public enum State { - Opened, Graceful, Immediate, Closed + /** Connection is open */ + Opened, + /** Connection is being closed gracefully */ + Graceful, + /** Connection is being terminated immediately */ + Immediate, + /** Connection is closed */ + Closed, + /* end */; } /** @@ -90,7 +98,7 @@ public abstract class AbstractCloseable extends IoBaseCloseable { } } else { if (debugEnabled) { - log.debug("close({})[Immediately] state already {}", this, state.get()); + log.debug("close({})[Immediately] state already {}", this, state); } } } else { @@ -119,7 +127,7 @@ public abstract class AbstractCloseable extends IoBaseCloseable { } } else { if (debugEnabled) { - log.debug("close({})[Graceful] state already {}", this, state.get()); + log.debug("close({})[Graceful] state already {}", this, state); } } } diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index 93832c6..3a51dda 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -318,7 +318,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C @Override public synchronized OpenFuture open() throws IOException { if (isClosing()) { - throw new SshException("Session has been closed"); + throw new SshException("Session has been closed: " + state); } openFuture = new DefaultOpenFuture(this.toString(), futureLock); @@ -399,6 +399,10 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C protected void doWriteData(byte[] data, int off, long len) throws IOException { // If we're already closing, ignore incoming data if (isClosing()) { + if (log.isDebugEnabled()) { + log.debug("doWriteData({}) ignored (len={}) channel state={}", this, len, state); + } + return; } ValidateUtils.checkTrue( diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index a5b788e..d8ebe09 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -780,19 +780,19 @@ public abstract class AbstractChannel @Override public IoWriteFuture writePacket(Buffer buffer) throws IOException { - Session s = getSession(); if (!isClosing()) { + Session s = getSession(); return s.writePacket(buffer); - } else { - if (log.isDebugEnabled()) { - log.debug("writePacket({}) Discarding output packet because channel is being closed", this); - } - return new AbstractIoWriteFuture(s.toString(), null) { - { - setValue(new EOFException("Channel is being closed")); - } - }; } + + if (log.isDebugEnabled()) { + log.debug("writePacket({}) Discarding output packet because channel state={}", this, state); + } + return new AbstractIoWriteFuture(toString(), null) { + { + setValue(new EOFException("Channel is being closed")); + } + }; } @Override @@ -921,29 +921,43 @@ public abstract class AbstractChannel protected abstract void doWriteExtendedData(byte[] data, int off, long len) throws IOException; - protected void sendEof() throws IOException { - if (isClosing()) { + /** + * Sends {@code SSH_MSG_CHANNEL_EOF} provided not already sent + * and current channel state allows it. + * + * @return The {@link IoWriteFuture} of the sent packet - {@code null} + * if message not sent due to channel state (or already sent) + * @throws IOException If failed to send the packet + */ + protected IoWriteFuture sendEof() throws IOException { + State channelState = state.get(); + // OK to send EOF if channel is open or being closed gracefully + if ((channelState != State.Opened) && (channelState != State.Graceful)) { if (log.isDebugEnabled()) { - log.debug("sendEof({}) already closing or closed", this); + log.debug("sendEof({}) already closing or closed - state={}", this, state); } - return; + return null; } if (eofSent.getAndSet(true)) { if (log.isDebugEnabled()) { - log.debug("sendEof({}) already sent", this); + log.debug("sendEof({}) already sent (state={})", this, channelState); } - return; + return null; } if (log.isDebugEnabled()) { - log.debug("sendEof({}) SSH_MSG_CHANNEL_EOF", this); + log.debug("sendEof({}) SSH_MSG_CHANNEL_EOF (state={})", this, channelState); } Session s = getSession(); Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_EOF, Short.SIZE); buffer.putInt(getRecipient()); - writePacket(buffer); + /* + * The default "writePacket" does not send packets if state + * is not open so we need to bypass it. + */ + return s.writePacket(buffer); } public boolean isEofSent() { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java index b0e715e..e2f1ebd 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java @@ -60,7 +60,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut @Override public synchronized IoWriteFuture writePacket(Buffer buffer) throws IOException { if (isClosing()) { - throw new EOFException("Closed"); + throw new EOFException("Closing: " + state); } IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, buffer); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java index 8836ece..7673920 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java @@ -192,11 +192,8 @@ public class DefaultForwardingFilter ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local); Objects.requireNonNull(remote, "Remote address is null"); - if (isClosed()) { - throw new IllegalStateException("TcpipForwarder is closed"); - } - if (isClosing()) { - throw new IllegalStateException("TcpipForwarder is closing"); + if (isClosed() || isClosing()) { + throw new IllegalStateException("TcpipForwarder is closed or closing: " + state); } InetSocketAddress bound = null; @@ -453,11 +450,8 @@ public class DefaultForwardingFilter Objects.requireNonNull(local, "Local address is null"); ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local); - if (isClosed()) { - throw new IllegalStateException("TcpipForwarder is closed"); - } - if (isClosing()) { - throw new IllegalStateException("TcpipForwarder is closing"); + if (isClosed() || isClosing()) { + throw new IllegalStateException("TcpipForwarder is closed or closing: " + state); } SocksProxy proxy = null; diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java index c59cef2..77bb724 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java @@ -52,7 +52,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { @Override public void sessionCreated(IoSession session) throws Exception { if (isClosing()) { - throw new SshException("SocksProxy is closing or closed"); + throw new SshException("SocksProxy is closing or closed: " + state); } } @@ -65,7 +65,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { } @Override - public void messageReceived(final IoSession session, org.apache.sshd.common.util.Readable message) throws Exception { + public void messageReceived(IoSession session, org.apache.sshd.common.util.Readable message) throws Exception { Buffer buffer = new ByteArrayBuffer(message.available() + Long.SIZE, false); buffer.putBuffer(message); Proxy proxy = proxies.get(session); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index 54d9929..4468276 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -1942,7 +1942,7 @@ public abstract class AbstractSession extends SessionHelper { } return ValidateUtils.checkNotNull( - kexFutureHolder.get(), "No current KEX future on state=%s", kexState.get()); + kexFutureHolder.get(), "No current KEX future on state=%s", kexState); } /** @@ -1970,7 +1970,7 @@ public abstract class AbstractSession extends SessionHelper { protected KeyExchangeFuture requestNewKeysExchange() throws IOException, GeneralSecurityException { if (!kexState.compareAndSet(KexState.DONE, KexState.INIT)) { if (log.isDebugEnabled()) { - log.debug("requestNewKeysExchange({}) KEX state not DONE: {}", this, kexState.get()); + log.debug("requestNewKeysExchange({}) KEX state not DONE: {}", this, kexState); } return null;