This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit f58f006bc416963893c42ca034f2a9b1dfd1fe19 Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Sat Mar 20 10:01:42 2021 +0200 [SSHD-1123] Provide configurable behavior for ChannelAsyncOutputStream chunking behavior --- CHANGES.md | 1 + .../common/channel/ChannelAsyncOutputStream.java | 28 +++++++++++++++++++--- .../sshd/common/channel/ChannelOutputStream.java | 24 ++++++++++++------- .../org/apache/sshd/core/CoreModuleProperties.java | 18 ++++++++++++++ .../apache/sshd/server/channel/ChannelSession.java | 25 +++++++++++++------ 5 files changed, 78 insertions(+), 18 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a909dfd..8db2755 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -44,6 +44,7 @@ * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added callbacks for client-side host-based authentication progress * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added capability for interactive password authentication participation via UserInteraction * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added capability for interactive key based authentication participation via UserInteraction +* [SSHD-1123](https://issues.apache.org/jira/browse/SSHD-1123) Add option to chunk data in ChannelAsyncOutputStream if window size is smaller than packet size * [SSHD-1125](https://issues.apache.org/jira/browse/SSHD-1125) Added mechanism to throttle pending write requests in BufferedIoOutputStream * [SSHD-1127](https://issues.apache.org/jira/browse/SSHD-1127) Added capability to register a custom receiver for SFTP STDERR channel raw or stream data * [SSHD-1133](https://issues.apache.org/jira/browse/SSHD-1133) Added capability to specify a custom charset for parsing incoming commands to the `ScpShell` diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java index 685d79e..eb2d5c5 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java @@ -42,16 +42,30 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut private final Object packetWriteId; private boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize; + /** + * @param channel The {@link Channel} through which the stream is communicating + * @param cmd Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or + * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the + * output stream type + */ public ChannelAsyncOutputStream(Channel channel, byte cmd) { this(channel, cmd, false); } /** + * @param channel The {@link Channel} through which the stream is + * communicating + * @param cmd Either {@link SshConstants#SSH_MSG_CHANNEL_DATA + * SSH_MSG_CHANNEL_DATA} or + * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA + * SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output stream + * type * @param sendChunkIfRemoteWindowIsSmallerThanPacketSize Determines the chunking behaviour, if the remote window * size is smaller than the packet size. Can be use to * establish compatibility with certain clients, that wait - * until the window size is 0 before adjusting it (see - * SSHD-1123). Default is false; + * until the window size is 0 before adjusting it. + * @see <A HREF= + * "https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A> */ public ChannelAsyncOutputStream(Channel channel, byte cmd, boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) { this.channelInstance = Objects.requireNonNull(channel, "No channel"); @@ -66,6 +80,15 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut return channelInstance; } + /** + * @return Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or + * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output + * stream type + */ + public byte getCommandType() { + return cmd; + } + public void onWindowExpanded() throws IOException { doWriteIfPossible(true); } @@ -256,5 +279,4 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) { this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = sendChunkIfRemoteWindowIsSmallerThanPacketSize; } - } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java index 42f4e28..2937c7d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java @@ -42,12 +42,12 @@ import org.slf4j.Logger; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public class ChannelOutputStream extends OutputStream implements java.nio.channels.Channel, ChannelHolder { + protected final Logger log; private final AbstractChannel channelInstance; private final ChannelStreamWriter packetWriter; private final Window remoteWindow; private final Duration maxWaitTimeout; - private final Logger log; private final byte cmd; private final boolean eofOnClose; private final byte[] b = new byte[1]; @@ -79,8 +79,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd); this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote window"); Objects.requireNonNull(maxWaitTimeout, "No maxWaitTimeout"); - ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s", - maxWaitTimeout.toString()); + ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s", maxWaitTimeout); this.maxWaitTimeout = maxWaitTimeout; this.log = Objects.requireNonNull(log, "No logger"); this.cmd = cmd; @@ -93,18 +92,27 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe return channelInstance; } - public boolean isEofOnClose() { - return eofOnClose; + /** + * @return Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or + * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output + * stream type + */ + public byte getCommandType() { + return cmd; } - public void setNoDelay(boolean noDelay) { - this.noDelay = noDelay; + public boolean isEofOnClose() { + return eofOnClose; } public boolean isNoDelay() { return noDelay; } + public void setNoDelay(boolean noDelay) { + this.noDelay = noDelay; + } + @Override public boolean isOpen() { return !closedState.get(); @@ -185,7 +193,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe @Override public synchronized void flush() throws IOException { - AbstractChannel channel = getChannel(); + Channel channel = getChannel(); if (!isOpen()) { throw new SshChannelClosedException( channel.getId(), diff --git a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java index 88d9724..062166d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java +++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java @@ -222,6 +222,24 @@ public final class CoreModuleProperties { public static final Property<Boolean> REQUEST_SUBSYSTEM_REPLY = Property.bool("channel-subsystem-want-reply", true); + /** + * If should chunk data sent via {@code ChannelAsyncOutputStream} when reported remote STDOUT stream window size is + * less than its packet size + * + * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A> + */ + public static final Property<Boolean> ASYNC_SERVER_STDOUT_CHUNK_BELOW_WINDOW_SIZE + = Property.bool("server-async-stdout-chunk-below-window-size", false); + + /** + * If should chunk data sent via {@code ChannelAsyncOutputStream} when reported remote STDERR stream window size is + * less than its packet size + * + * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A> + */ + public static final Property<Boolean> ASYNC_SERVER_STDERR_CHUNK_BELOW_WINDOW_SIZE + = Property.bool("server-async-stderr-chunk-below-window-size", false); + public static final Property<Integer> PROP_DHGEX_CLIENT_MIN_KEY = Property.integer("dhgex-client-min"); diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index 93821eb..963a14b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -722,10 +722,10 @@ public class ChannelSession extends AbstractServerChannel { if (command instanceof AsyncCommandStreamsAware) { asyncOut = new ChannelAsyncOutputStream( this, SshConstants.SSH_MSG_CHANNEL_DATA, - isSendChunkIfRemoteWindowIsSmallerThanPacketSize()); + isSendChunkIfRemoteWindowIsSmallerThanPacketSize(SshConstants.SSH_MSG_CHANNEL_DATA)); asyncErr = new ChannelAsyncOutputStream( this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA, - isSendChunkIfRemoteWindowIsSmallerThanPacketSize()); + isSendChunkIfRemoteWindowIsSmallerThanPacketSize(SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA)); ((AsyncCommandStreamsAware) command).setIoOutputStream(asyncOut); ((AsyncCommandStreamsAware) command).setIoErrorStream(asyncErr); } else { @@ -922,11 +922,22 @@ public class ChannelSession extends AbstractServerChannel { /** * Chance for specializations to vary chunking behaviour depending on the SFTP client version. * - * @return {@code true} if chunk data sent via {@link ChannelAsyncOutputStream} when reported remote window size is - * less than its packet size - * @see ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean) + * @param cmd Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or + * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA + * SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output stream type + * @return {@code true} if should chunk data sent via {@link ChannelAsyncOutputStream} + * when reported remote window size is less than its packet size + * @see ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean) + * @throws UnsupportedOperationException if the command is neither of the supported ones */ - protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize() { - return false; + protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize(byte cmd) { + if (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) { + return CoreModuleProperties.ASYNC_SERVER_STDOUT_CHUNK_BELOW_WINDOW_SIZE.getRequired(this); + } else if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) { + return CoreModuleProperties.ASYNC_SERVER_STDERR_CHUNK_BELOW_WINDOW_SIZE.getRequired(this); + } else { + throw new UnsupportedOperationException( + "Unsupported channel data stream command: " + SshConstants.getCommandMessageName(cmd & 0xFF)); + } } }