This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push: new 1cf3056 [SSHD-1079] Async mode on the local port forwarder (#167) 1cf3056 is described below commit 1cf3056cfddcdbf9aef6c96ef662f5e001477f97 Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Tue Sep 22 08:30:06 2020 +0200 [SSHD-1079] Async mode on the local port forwarder (#167) Disabled by default --- .../org/apache/sshd/common/forward/SocksProxy.java | 12 ++++++++--- .../sshd/common/forward/TcpipClientChannel.java | 25 +++++++++++++++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java index bdb0b7f..f87d35c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.SshException; import org.apache.sshd.common.io.IoHandler; +import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.util.buffer.Buffer; @@ -100,9 +101,14 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { } protected void onMessage(Buffer buffer) throws IOException { - OutputStream invertedIn = channel.getInvertedIn(); - invertedIn.write(buffer.array(), buffer.rpos(), buffer.available()); - invertedIn.flush(); + IoOutputStream asyncIn = channel.getAsyncIn(); + if (asyncIn != null) { + asyncIn.writePacket(buffer); + } else { + OutputStream invertedIn = channel.getInvertedIn(); + invertedIn.write(buffer.array(), buffer.rpos(), buffer.available()); + invertedIn.flush(); + } } @Override diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java index ae78076..c743948 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java @@ -33,8 +33,11 @@ import org.apache.sshd.common.Closeable; import org.apache.sshd.common.NamedResource; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; +import org.apache.sshd.common.channel.ChannelAsyncInputStream; +import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.Window; +import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.ValidateUtils; @@ -161,12 +164,24 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward @Override protected synchronized void doOpen() throws IOException { if (streaming == Streaming.Async) { - throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel"); + asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) { + @SuppressWarnings("synthetic-access") + @Override + protected CloseFuture doCloseGracefully() { + try { + sendEof(); + } catch (IOException e) { + getSession().exceptionCaught(e); + } + return super.doCloseGracefully(); + } + }; + asyncOut = new ChannelAsyncInputStream(this); + } else { + out = new ChannelOutputStream( + this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + invertedIn = out; } - - out = new ChannelOutputStream( - this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); - invertedIn = out; } @Override