This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push: new 72d6c00 [SSHD-1070] Limit the amount of data that is kept in memory for forwa… (#166) 72d6c00 is described below commit 72d6c0086d2e86060e82e39b531338473f5195d0 Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Tue Sep 22 08:29:22 2020 +0200 [SSHD-1070] Limit the amount of data that is kept in memory for forwa… (#166) * Add a switch to choose between sync / async modes for the TcpipServerChannel * Enable load tests --- .../java/org/apache/sshd/common/io/IoSession.java | 19 ++++++ sshd-core/pom.xml | 3 - .../apache/sshd/client/channel/ClientChannel.java | 11 +--- .../sshd/common/channel/SimpleIoOutputStream.java | 67 ++++++++++++++++++++ .../sshd/common/channel/StreamingChannel.java | 37 +++++++++++ .../apache/sshd/common/io/nio2/Nio2Session.java | 39 ++++++++++++ .../org/apache/sshd/core/CoreModuleProperties.java | 17 ++++++ .../sshd/server/forward/TcpipServerChannel.java | 71 +++++++++++++++++----- .../src/test/java/org/apache/sshd/LoadTest.java | 2 - 9 files changed, 237 insertions(+), 29 deletions(-) diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java index 20dbb46..f8de2b4 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java @@ -93,4 +93,23 @@ public interface IoSession extends ConnectionEndpointsIndicator, PacketWriter, C * @throws IOException If failed to shutdown the stream */ void shutdownOutputStream() throws IOException; + + /** + * Suspend read operations on this session. May do nothing if not supported by the session implementation. + * + * If the session usage includes a graceful shutdown with messages being exchanged, the caller needs to + * take care of resuming reading the input in order to actually be able to carry on the conversation with + * the peer. + */ + default void suspendRead() { + // Do nothing by default, but can be overriden by implementations + } + + /** + * Resume read operations on this session. May do nothing if not supported by the session implementation. + */ + default void resumeRead() { + // Do nothing by default, but can be overriden by implementations + } + } diff --git a/sshd-core/pom.xml b/sshd-core/pom.xml index 48bffcd..8b4db65 100644 --- a/sshd-core/pom.xml +++ b/sshd-core/pom.xml @@ -184,9 +184,6 @@ <configuration> <redirectTestOutputToFile>true</redirectTestOutputToFile> <reportsDirectory>${project.build.directory}/surefire-reports-nio2</reportsDirectory> - <excludes> - <exclude>**/*LoadTest.java</exclude> - </excludes> </configuration> </plugin> </plugins> diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java index 6bd15dc..7897ba7 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java @@ -31,6 +31,7 @@ import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.client.session.ClientSessionHolder; import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.channel.StreamingChannel; import org.apache.sshd.common.io.IoInputStream; import org.apache.sshd.common.io.IoOutputStream; @@ -41,11 +42,7 @@ import org.apache.sshd.common.io.IoOutputStream; * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public interface ClientChannel extends Channel, ClientSessionHolder { - enum Streaming { - Async, - Sync - } +public interface ClientChannel extends Channel, StreamingChannel, ClientSessionHolder { @Override default ClientSession getClientSession() { @@ -57,10 +54,6 @@ public interface ClientChannel extends Channel, ClientSessionHolder { */ String getChannelType(); - Streaming getStreaming(); - - void setStreaming(Streaming streaming); - IoOutputStream getAsyncIn(); IoInputStream getAsyncOut(); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java new file mode 100644 index 0000000..6fee66a --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.channel; + +import java.io.IOException; + +import org.apache.sshd.common.io.AbstractIoWriteFuture; +import org.apache.sshd.common.io.IoOutputStream; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.closeable.AbstractCloseable; +import org.apache.sshd.common.util.io.IoUtils; +import org.apache.sshd.server.forward.TcpipServerChannel; + +/** + * An implementation of {@link IoOutputStream} using a synchronous {@link ChannelOutputStream}. + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class SimpleIoOutputStream extends AbstractCloseable implements IoOutputStream { + + protected final ChannelOutputStream os; + + public SimpleIoOutputStream(ChannelOutputStream os) { + this.os = os; + } + + @Override + protected void doCloseImmediately() { + IoUtils.closeQuietly(os); + super.doCloseImmediately(); + } + + @Override + public IoWriteFuture writePacket(Buffer buffer) throws IOException { + os.write(buffer.array(), buffer.rpos(), buffer.available()); + os.flush(); + DefaultIoWriteFuture f = new DefaultIoWriteFuture(this, null); + f.setValue(true); + return f; + } + + protected static class DefaultIoWriteFuture extends AbstractIoWriteFuture { + + public DefaultIoWriteFuture(Object id, Object lock) { + super(id, lock); + } + + } + +} diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java new file mode 100644 index 0000000..e2d7b94 --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.channel; + +/** + * A channel that can be either configured to use synchronous or asynchrounous streams. + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public interface StreamingChannel { + + enum Streaming { + Async, + Sync + } + + Streaming getStreaming(); + + void setStreaming(Streaming streaming); + +} diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index 84d0468..2200ba2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java @@ -70,6 +70,9 @@ public class Nio2Session extends AbstractCloseable implements IoSession { private final AtomicLong lastReadCycleStart = new AtomicLong(); private final AtomicLong writeCyclesCounter = new AtomicLong(); private final AtomicLong lastWriteCycleStart = new AtomicLong(); + private final Object suspendLock = new Object(); + private volatile boolean suspend; + private volatile Runnable readRunnable; public Nio2Session( Nio2Service service, FactoryManager manager, IoHandler handler, AsynchronousSocketChannel socket, @@ -382,7 +385,43 @@ public class Nio2Session extends AbstractCloseable implements IoSession { exceptionCaught(exc); } + @Override + public void suspendRead() { + log.trace("suspendRead({})", this); + boolean prev = suspend; + suspend = true; + if (!prev) { + log.debug("suspendRead({}) requesting read suspension", this); + } + } + + @Override + public void resumeRead() { + log.trace("resumeRead({})", this); + if (suspend) { + Runnable runnable; + synchronized (suspendLock) { + suspend = false; + runnable = readRunnable; + } + if (runnable != null) { + log.debug("resumeRead({}) resuming read", this); + runnable.run(); + } + } + } + protected void doReadCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, Object> completion) { + if (suspend) { + log.debug("doReadCycle({}) suspending reading", this); + synchronized (suspendLock) { + if (suspend) { + readRunnable = () -> doReadCycle(buffer, completion); + return; + } + } + } + AsynchronousSocketChannel socket = getSocket(); Duration readTimeout = CoreModuleProperties.NIO2_READ_TIMEOUT.getRequired(manager); readCyclesCounter.incrementAndGet(); diff --git a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java index 828bf72..d728c3e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java +++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java @@ -678,6 +678,23 @@ public final class CoreModuleProperties { public static final Property<String> X11_BIND_HOST = Property.string("x11-fwd-bind-host", SshdSocketAddress.LOCALHOST_IPV4); + /** + * Configuration value for the {@link org.apache.sshd.server.forward.TcpipServerChannel} to control the higher + * theshold for the data to be buffered waiting to be sent. If the buffered data size reaches this value, the + * session will pause reading until the data length goes below the + * {@link #TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW} threshold. + */ + public static final Property<Long> TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH + = Property.long_("tcpip-server-channel-buffer-size-threshold-high", 1024 * 1024); + + /** + * The lower threshold. If not set, half the higher threshold will be used. + * + * @see #TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH + */ + public static final Property<Long> TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW + = Property.long_("tcpip-server-channel-buffer-size-threshold-low"); + private CoreModuleProperties() { throw new UnsupportedOperationException("No instance"); } diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index a64eaf3..e14775a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -23,6 +23,7 @@ import java.net.ConnectException; import java.net.SocketAddress; import java.util.Collections; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; @@ -34,17 +35,22 @@ import org.apache.sshd.common.channel.BufferedIoOutputStream; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.channel.ChannelFactory; +import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.channel.SimpleIoOutputStream; +import org.apache.sshd.common.channel.StreamingChannel; import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.channel.exception.SshChannelOpenException; import org.apache.sshd.common.forward.Forwarder; import org.apache.sshd.common.forward.ForwardingTunnelEndpointsProvider; import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoConnectFuture; import org.apache.sshd.common.io.IoConnector; import org.apache.sshd.common.io.IoHandler; import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.io.IoServiceFactory; import org.apache.sshd.common.io.IoSession; +import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.Readable; @@ -56,6 +62,7 @@ import org.apache.sshd.common.util.net.SshdSocketAddress; import org.apache.sshd.common.util.threads.CloseableExecutorService; import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; import org.apache.sshd.common.util.threads.ThreadUtils; +import org.apache.sshd.core.CoreModuleProperties; import org.apache.sshd.server.channel.AbstractServerChannel; import org.apache.sshd.server.forward.TcpForwardingFilter.Type; @@ -64,7 +71,7 @@ import org.apache.sshd.server.forward.TcpForwardingFilter.Type; * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public class TcpipServerChannel extends AbstractServerChannel implements ForwardingTunnelEndpointsProvider { +public class TcpipServerChannel extends AbstractServerChannel implements StreamingChannel, ForwardingTunnelEndpointsProvider { public abstract static class TcpipFactory implements ChannelFactory, ExecutorServiceCarrier { @@ -102,6 +109,8 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward private SshdSocketAddress tunnelExit; private SshdSocketAddress originatorAddress; private SocketAddress localAddress; + private final AtomicLong inFlightDataSize = new AtomicLong(); + private Streaming streaming = Streaming.Sync; public TcpipServerChannel(ForwardingFilter.Type type, CloseableExecutorService executor) { super("", Collections.emptyList(), executor); @@ -121,6 +130,16 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward } @Override + public Streaming getStreaming() { + return streaming; + } + + @Override + public void setStreaming(Streaming streaming) { + this.streaming = streaming; + } + + @Override public SshdSocketAddress getTunnelEntrance() { return tunnelEntrance; } @@ -195,19 +214,27 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward throw new RuntimeSshException(e); } - out = new BufferedIoOutputStream( - "tcpip channel", new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) { - @SuppressWarnings("synthetic-access") - @Override - protected CloseFuture doCloseGracefully() { - try { - sendEof(); - } catch (IOException e) { - session.exceptionCaught(e); - } - return super.doCloseGracefully(); + if (streaming == Streaming.Async) { + out = new BufferedIoOutputStream( + "tcpip channel", new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) { + @SuppressWarnings("synthetic-access") + @Override + protected CloseFuture doCloseGracefully() { + try { + sendEof(); + } catch (IOException e) { + session.exceptionCaught(e); } - }); + return super.doCloseGracefully(); + } + }); + } else { + this.out = new SimpleIoOutputStream(new ChannelOutputStream( + this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true)); + + } + long thresholdHigh = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH.getRequired(this); + long thresholdLow = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh / 2); IoHandler handler = new IoHandler() { @Override @SuppressWarnings("synthetic-access") @@ -217,9 +244,23 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward log.debug("doInit({}) Ignoring write to channel in CLOSING state", TcpipServerChannel.this); } } else { - Buffer buffer = new ByteArrayBuffer(message.available(), false); + int length = message.available(); + Buffer buffer = new ByteArrayBuffer(length, false); buffer.putBuffer(message); - out.writePacket(buffer); + long total = inFlightDataSize.addAndGet(length); + if (total > thresholdHigh) { + session.suspendRead(); + } + IoWriteFuture ioWriteFuture = out.writePacket(buffer); + ioWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() { + @Override + public void operationComplete(IoWriteFuture future) { + long total = inFlightDataSize.addAndGet(-length); + if (total <= thresholdLow) { + session.resumeRead(); + } + } + }); } } diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java index 4948fe0..7afae32 100644 --- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java @@ -127,8 +127,6 @@ public class LoadTest extends BaseTestSupport { try (SshClient client = setupTestFullSupportClient()) { CoreModuleProperties.MAX_PACKET_SIZE.set(client, 1024L * 16); CoreModuleProperties.WINDOW_SIZE.set(client, 1024L * 8); - client.setKeyExchangeFactories(Collections.singletonList(ClientBuilder.DH2KEX.apply(BuiltinDHFactories.dhg1))); - client.setCipherFactories(Collections.singletonList(BuiltinCiphers.blowfishcbc)); client.start(); try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(CONNECT_TIMEOUT).getSession()) {