Repository: mina-sshd Updated Branches: refs/heads/master 1f3006b93 -> e0a6b8da2
[SSHD-768] Introduced PacketWriter + ChannelStreamPacketWriterResolver + ThrottlingPacketWriter hooks for user intervention Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e0a6b8da Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e0a6b8da Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e0a6b8da Branch: refs/heads/master Commit: e0a6b8da2b9b6af2fa6ff23fdcd76188ab1db2fe Parents: 1f3006b Author: Goldstein Lyor <l...@c-b4.com> Authored: Wed Sep 6 09:22:21 2017 +0300 Committer: Goldstein Lyor <l...@c-b4.com> Committed: Thu Sep 7 08:51:17 2017 +0300 ---------------------------------------------------------------------- README.md | 14 ++ .../throttle/ThrottlingPacketWriter.java | 217 +++++++++++++++++++ .../throttle/ThrottlingPacketWriterTest.java | 129 +++++++++++ .../org/apache/sshd/common/BaseBuilder.java | 10 +- .../org/apache/sshd/common/FactoryManager.java | 2 + .../sshd/common/channel/AbstractChannel.java | 29 ++- .../common/channel/BufferedIoOutputStream.java | 24 +- .../org/apache/sshd/common/channel/Channel.java | 4 + .../channel/ChannelAsyncOutputStream.java | 36 ++- .../common/channel/ChannelOutputStream.java | 31 ++- .../ChannelStreamPacketWriterResolver.java | 45 ++++ ...hannelStreamPacketWriterResolverManager.java | 44 ++++ .../apache/sshd/common/forward/SocksProxy.java | 18 +- .../sshd/common/forward/TcpipClientChannel.java | 2 +- .../common/helpers/AbstractFactoryManager.java | 12 + .../apache/sshd/common/io/IoOutputStream.java | 13 +- .../org/apache/sshd/common/io/IoSession.java | 27 +-- .../org/apache/sshd/common/io/PacketWriter.java | 43 ++++ .../apache/sshd/common/io/mina/MinaSession.java | 2 +- .../apache/sshd/common/io/nio2/Nio2Session.java | 2 +- .../org/apache/sshd/common/session/Session.java | 17 +- .../common/session/helpers/AbstractSession.java | 33 ++- .../sshd/server/forward/TcpipServerChannel.java | 2 +- .../server/session/AbstractServerSession.java | 5 +- .../sshd/server/x11/ChannelForwardedX11.java | 2 +- .../java/org/apache/sshd/WindowAdjustTest.java | 38 +++- .../java/org/apache/sshd/client/ClientTest.java | 21 +- .../apache/sshd/common/channel/WindowTest.java | 2 +- .../session/helpers/AbstractSessionTest.java | 8 +- .../sshd/server/ServerProxyAcceptorTest.java | 2 +- .../sshd/server/channel/ChannelSessionTest.java | 6 +- .../sshd/util/test/AsyncEchoShellFactory.java | 2 +- .../org/apache/sshd/util/test/BogusChannel.java | 13 ++ 33 files changed, 735 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index de1e2dd..0f53709 100644 --- a/README.md +++ b/README.md @@ -482,6 +482,17 @@ injecting some dynamic data before the command is `start()`-ed. associated with this command. +### Data stream(s) sizing consideration + +Some commands may send/receive large amounts of data over their STDIN/STDOUT/STDERR streams. Since (by default) the sending mechanism in SSHD is +**asynchronous** it may cause _Out of memory_ errors due to one side (client/server) generating `SSH_MSG_CHANNEL_DATA` or `SSH_MSG_CHANNEL_EXTENDED_DATA` +at a much higher rate than the other side can consume. This leads to a build-up of a packets backlog that eventually consumes all available memory +(as described in [SSHD-754](https://issues.apache.org/jira/browse/SSHD-754) and [SSHD-768](https://issues.apache.org/jira/browse/SSHD-768)). As of +version 1.7 one can register a `ChannelStreamPacketWriterResolver` at the client/server/session/channel level that can enable the user to replace +the raw channel with some throttling mechanism that will be used for stream packets. Such an (experimental) example is the `ThrottlingPacketWriter` +available in the `sshd-contrib` module. **Note:** if the `ChannelStreamPacketWriterResolver` returns a wrapper instance instead of a `Channel` then +it will be **closed** automatically when the stream using it is closed. + ## SCP Besides the `ScpTransferEventListener`, the SCP module also uses a `ScpFileOpener` instance in order to access @@ -1220,6 +1231,9 @@ reading data - and those that modify it * `ProxyProtocolAcceptor` - A working prototype to support the PROXY protocol as described in [HAProxy Documentation](http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) +* `ThrottlingPacketWriter` - An example of a way to overcome big window sizes when sending data - as described in [SSHD-754](https://issues.apache.org/jira/browse/SSHD-754) +and [SSHD-768](https://issues.apache.org/jira/browse/SSHD-768) + # Builtin components Below is the list of builtin components: http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java ---------------------------------------------------------------------- diff --git a/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java new file mode 100644 index 0000000..3efcc85 --- /dev/null +++ b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.channel.throttle; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.channels.ClosedSelectorException; +import java.nio.channels.InterruptedByTimeoutException; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.sshd.common.PropertyResolver; +import org.apache.sshd.common.PropertyResolverUtils; +import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.future.SshFutureListener; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.io.PacketWriter; +import org.apache.sshd.common.util.ValidateUtils; +import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.logging.AbstractLoggingBean; + +/** + * A {@link PacketWriter} delegate implementation that "throttles" the + * output by having a limit on the outstanding packets that have not been sent + * yet. The {@link #writePacket(Buffer) writePacket} implementation make sure + * that the limit has not been exceeded - if so, then it waits until pending + * packets have been successfully sent before sending the next packet. + * + * <B>Note:</B> {@link #close() closing} the throttler does not close the delegate writer + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class ThrottlingPacketWriter extends AbstractLoggingBean implements PacketWriter, SshFutureListener<IoWriteFuture> { + /** Timeout (seconds) for throttling packet writer to wait for pending packets send */ + public static final String WAIT_TIME_PROP = "packet-writer-wait-time"; + + /** Default value for {@value #WAIT_TIME_PROP} if none specified */ + public static final long DEFAULT_MAX_WAIT_TIME = 30L; + + /** Max. pending packets count */ + public static final String MAX_PEND_COUNT = "packet-writer-max-pend-count"; + + /** Default value for {@value #MAX_PEND_COUNT} if none specified */ + public static final int DEFAULT_PEND_COUNT_MAX = 4096; + + private final boolean traceEnabled; + private final PacketWriter delegate; + private final int maxPendingPackets; + private final long maxWait; + private final AtomicBoolean open = new AtomicBoolean(true); + private final AtomicInteger availableCount; + + public ThrottlingPacketWriter(Channel channel) { + this(channel, channel); + } + + public ThrottlingPacketWriter(PacketWriter delegate, PropertyResolver resolver) { + this(delegate, PropertyResolverUtils.getIntProperty(resolver, MAX_PEND_COUNT, DEFAULT_PEND_COUNT_MAX), + TimeUnit.SECONDS, PropertyResolverUtils.getLongProperty(resolver, WAIT_TIME_PROP, DEFAULT_MAX_WAIT_TIME)); + } + + public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, TimeUnit waitUnit, long waitCount) { + this(delegate, maxPendingPackets, waitUnit.toMillis(waitCount)); + } + + public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, long maxWait) { + this.delegate = Objects.requireNonNull(delegate, "No delegate provided"); + ValidateUtils.checkTrue(maxPendingPackets > 0, "Invalid pending packets limit: %d", maxPendingPackets); + this.maxPendingPackets = maxPendingPackets; + this.availableCount = new AtomicInteger(maxPendingPackets); + ValidateUtils.checkTrue(maxWait > 0L, "Invalid max. pending wait time: %d", maxWait); + this.maxWait = maxWait; + this.traceEnabled = log.isTraceEnabled(); + } + + public PacketWriter getDelegate() { + return delegate; + } + + public int getMaxPendingPackets() { + return maxPendingPackets; + } + + public int getAvailablePacketsCount() { + return availableCount.get(); + } + + public long getMaxWait() { + return maxWait; + } + + @Override + public boolean isOpen() { + return open.get(); + } + + @Override + public IoWriteFuture writePacket(Buffer buffer) throws IOException { + if (!isOpen()) { + throw new ClosedSelectorException(); + } + + long remainWait = getMaxWait(); + int available; + synchronized (availableCount) { + while (availableCount.get() == 0) { + long waitStart = System.currentTimeMillis(); + try { + availableCount.wait(remainWait); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted after " + (System.currentTimeMillis() - waitStart) + " msec."); + } + long waitDuration = System.currentTimeMillis() - waitStart; + if (waitDuration <= 0L) { + waitDuration = 1L; + } + remainWait -= waitDuration; + if (remainWait <= 0L) { + throw new InterruptedByTimeoutException(); + } + } + + available = availableCount.decrementAndGet(); + } + + if (traceEnabled) { + log.trace("writePacket({}) available={} after {} msec.", this, available, getMaxWait() - remainWait); + } + if (available < 0) { + throw new EOFException("Negative available packets count: " + available); + } + + PacketWriter writer = getDelegate(); + return writer.writePacket(buffer).addListener(this); + } + + @Override + public void operationComplete(IoWriteFuture future) { + if (future.isDone()) { + if (future.isWritten()) { + int available; + synchronized (availableCount) { + available = isOpen() ? availableCount.incrementAndGet() : Integer.MIN_VALUE; + availableCount.notifyAll(); + } + + if (available > 0) { + if (traceEnabled) { + log.trace("operationComplete({}) available={}", this, available); + } + return; + } + + /* + * If non-positive it may be that close has been signaled or mis-count - in any case, don't take any chances + */ + log.error("operationComplete({}) invalid available count: {}", this, available); + } else { + Throwable err = future.getException(); + log.error("operationComplete({}) Error ({}) signalled: {}", this, err.getClass().getSimpleName(), err.getMessage()); + } + } else { + log.error("operationComplete({}) Incomplete future signalled: {}", this, future); + } + + try { + close(); + } catch (IOException e) { + log.warn("operationComplete({}) unexpected ({}) due to close: {}", + this, e.getClass().getSimpleName(), e.getMessage()); + } + } + + @Override + public void close() throws IOException { + if (open.getAndSet(false)) { + if (log.isDebugEnabled()) { + log.debug("close({}) closing"); + } + } + + // Do it again if called - no harm + synchronized (availableCount) { + availableCount.set(-1); + availableCount.notifyAll(); + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "[delegate=" + getDelegate() + + ", maxWait=" + getMaxWait() + + ", maxPending=" + getMaxPendingPackets() + + ", available=" + getAvailablePacketsCount() + + "]"; + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java ---------------------------------------------------------------------- diff --git a/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java new file mode 100644 index 0000000..94958c8 --- /dev/null +++ b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.channel.throttle; + +import java.io.IOException; +import java.io.StreamCorruptedException; +import java.nio.channels.ClosedSelectorException; +import java.nio.channels.InterruptedByTimeoutException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.sshd.common.channel.IoWriteFutureImpl; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.io.PacketWriter; +import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.util.test.BaseTestSupport; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +/** + * TODO Add javadoc + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ThrottlingPacketWriterTest extends BaseTestSupport { + public ThrottlingPacketWriterTest() { + super(); + } + + @Test + public void testThrottlerWaitsUntilPacketSendSignalled() throws IOException { + try (ThrottlingPacketWriter throttler = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { + int maxSize = throttler.getMaxPendingPackets(); + List<IoWriteFuture> pendingWrites = new ArrayList<>(maxSize); + Buffer buf = new ByteArrayBuffer(Byte.SIZE); + for (int index = maxSize; index > 0; index--) { + IoWriteFuture future = throttler.writePacket(buf); + pendingWrites.add(future); + assertEquals("Mismatched available packets count", index - 1, throttler.getAvailablePacketsCount()); + } + + assertEquals("Not all available packet window size exhausted", 0, throttler.getAvailablePacketsCount()); + try { + IoWriteFuture future = throttler.writePacket(buf); + fail("Unexpected extra packet success: " + future); + } catch (InterruptedByTimeoutException e) { + // expected + } + + int sendSize = pendingWrites.size() / 2; + for (int index = 0; index < sendSize; index++) { + IoWriteFutureImpl future = (IoWriteFutureImpl) pendingWrites.get(index); + future.setValue(Boolean.TRUE); + assertEquals("Mismatched available packets count", index + 1, throttler.getAvailablePacketsCount()); + } + + for (int index = throttler.getAvailablePacketsCount(); index < maxSize; index++) { + throttler.writePacket(buf); + } + } + } + + @Test(expected = ClosedSelectorException.class) + public void testThrottlerDoesNotSendIfClosed() throws IOException { + try (PacketWriter throttler = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { + assertTrue("Throttler not marked as open", throttler.isOpen()); + throttler.close(); + assertFalse("Throttler not marked as closed", throttler.isOpen()); + + IoWriteFuture future = throttler.writePacket(new ByteArrayBuffer(Byte.SIZE)); + fail("Unexpected success: " + future); + } + } + + @Test(expected = ClosedSelectorException.class) + public void testThrottlerStopsSendingIfExceptionSignaledOnFutureOperationCompletion() throws IOException { + try (PacketWriter throttler = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { + assertTrue("Throttler not marked as open", throttler.isOpen()); + + IoWriteFutureImpl futureImpl = (IoWriteFutureImpl) throttler.writePacket(new ByteArrayBuffer(Byte.SIZE)); + futureImpl.setValue(new StreamCorruptedException(getCurrentTestName())); + assertFalse("Throttler not marked as closed", throttler.isOpen()); + + IoWriteFuture future = throttler.writePacket(new ByteArrayBuffer(Byte.SIZE)); + fail("Unexpected success: " + future); + } + } + + private static class MockPacketWriter implements PacketWriter { + MockPacketWriter() { + super(); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("Unexpected close call"); + } + + @Override + public IoWriteFuture writePacket(Buffer buffer) throws IOException { + return new IoWriteFutureImpl(buffer); + } + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java index 250dc63..d7b1732 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.RequestHandler; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; import org.apache.sshd.common.cipher.BuiltinCiphers; import org.apache.sshd.common.cipher.Cipher; import org.apache.sshd.common.compression.Compression; @@ -141,6 +142,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder protected ForwardingFilterFactory forwarderFactory; protected List<RequestHandler<ConnectionService>> globalRequestHandlers; protected ForwardingFilter forwardingFilter; + protected ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver; public BaseBuilder() { super(); @@ -238,7 +240,12 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder return me(); } - public T build(final boolean isFillWithDefaultValues) { + public S channelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) { + channelStreamPacketWriterResolver = resolver; + return me(); + } + + public T build(boolean isFillWithDefaultValues) { if (isFillWithDefaultValues) { fillWithDefaultValues(); } @@ -256,6 +263,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder ssh.setForwardingFilter(forwardingFilter); ssh.setForwarderFactory(forwarderFactory); ssh.setGlobalRequestHandlers(globalRequestHandlers); + ssh.setChannelStreamPacketWriterResolver(channelStreamPacketWriterResolver); return ssh; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java index 71e06da..3aef39b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java @@ -26,6 +26,7 @@ import org.apache.sshd.agent.SshAgentFactory; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelListenerManager; import org.apache.sshd.common.channel.RequestHandler; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; import org.apache.sshd.common.file.FileSystemFactory; import org.apache.sshd.common.forward.ForwardingFilterFactory; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; @@ -51,6 +52,7 @@ public interface FactoryManager SessionListenerManager, ReservedSessionMessagesManager, ChannelListenerManager, + ChannelStreamPacketWriterResolverManager, PortForwardingEventListenerManager, AttributeStore, PropertyResolver { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index 8f32a2b..2a6c0e9 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -40,6 +40,8 @@ import org.apache.sshd.common.Closeable; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.PropertyResolver; import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.DefaultCloseFuture; import org.apache.sshd.common.future.SshFutureListener; @@ -98,6 +100,8 @@ public abstract class AbstractChannel private final Window localWindow; private final Window remoteWindow; + private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver; + /** * A {@link Map} of sent requests - key = request name, value = timestamp when * request was sent. @@ -198,6 +202,27 @@ public abstract class AbstractChannel shutdownExecutor = shutdown; } + @Override + public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { + return channelStreamPacketWriterResolver; + } + + @Override + public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) { + channelStreamPacketWriterResolver = resolver; + } + + @Override + public ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() { + ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver(); + if (resolver != null) { + return resolver; + } + + ChannelStreamPacketWriterResolverManager manager = getSession(); + return manager.resolveChannelStreamPacketWriterResolver(); + } + /** * Add a channel request to the tracked pending ones if reply is expected * @@ -760,7 +785,8 @@ public abstract class AbstractChannel super.doCloseImmediately(); } - protected IoWriteFuture writePacket(Buffer buffer) throws IOException { + @Override + public IoWriteFuture writePacket(Buffer buffer) throws IOException { if (!isClosing()) { Session s = getSession(); return s.writePacket(buffer); @@ -880,6 +906,7 @@ public abstract class AbstractChannel Window wRemote = getRemoteWindow(); wRemote.expand(window); + notifyStateChanged("SSH_MSG_CHANNEL_WINDOW_ADJUST"); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java index 6bacdc8..6737a82 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java @@ -18,6 +18,7 @@ */ package org.apache.sshd.common.channel; +import java.io.EOFException; import java.io.IOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -43,22 +44,22 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io } @Override - public IoWriteFuture write(Buffer buffer) { - IoWriteFutureImpl future = new IoWriteFutureImpl(buffer); + public IoWriteFuture writePacket(Buffer buffer) throws IOException { if (isClosing()) { - future.setValue(new IOException("Closed")); - } else { - writes.add(future); - startWriting(); + throw new EOFException("Closed"); } + + IoWriteFutureImpl future = new IoWriteFutureImpl(buffer); + writes.add(future); + startWriting(); return future; } - protected void startWriting() { + protected void startWriting() throws IOException { final IoWriteFutureImpl future = writes.peek(); if (future != null) { if (currentWrite.compareAndSet(null, future)) { - out.write(future.getBuffer()).addListener(new SshFutureListener<IoWriteFuture>() { + out.writePacket(future.getBuffer()).addListener(new SshFutureListener<IoWriteFuture>() { @Override public void operationComplete(IoWriteFuture f) { if (f.isWritten()) { @@ -69,10 +70,15 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io finishWrite(); } + @SuppressWarnings("synthetic-access") private void finishWrite() { writes.remove(future); currentWrite.compareAndSet(future, null); - startWriting(); + try { + startWriting(); + } catch (IOException e) { + log.error("finishWrite({}) failed ({}) re-start writing", out, e.getClass().getSimpleName()); + } } }); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java index fa5e226..2061d26 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java @@ -26,6 +26,8 @@ import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.AttributeStore; import org.apache.sshd.common.Closeable; import org.apache.sshd.common.PropertyResolver; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; +import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; @@ -41,6 +43,8 @@ public interface Channel extends ChannelListenerManager, PropertyResolver, AttributeStore, + PacketWriter, + ChannelStreamPacketWriterResolverManager, Closeable { // Known types of channels String CHANNEL_EXEC = "exec"; http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java index e0da5c0..e525355 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java @@ -18,6 +18,7 @@ */ package org.apache.sshd.common.channel; +import java.io.EOFException; import java.io.IOException; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -27,19 +28,21 @@ import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.io.WritePendingException; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.closeable.AbstractCloseable; public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder { - private final Channel channelInstance; + private final PacketWriter packetWriter; private final byte cmd; private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>(); public ChannelAsyncOutputStream(Channel channel, byte cmd) { this.channelInstance = Objects.requireNonNull(channel, "No channel"); + this.packetWriter = channelInstance.resolveChannelStreamPacketWriter(channel, cmd); this.cmd = cmd; } @@ -53,20 +56,33 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut } @Override - public synchronized IoWriteFuture write(Buffer buffer) { - IoWriteFutureImpl future = new IoWriteFutureImpl(buffer); + public synchronized IoWriteFuture writePacket(Buffer buffer) throws IOException { if (isClosing()) { - future.setValue(new IOException("Closed")); - } else { - if (!pendingWrite.compareAndSet(null, future)) { - throw new WritePendingException("No write pending future"); - } - doWriteIfPossible(false); + throw new EOFException("Closed"); + } + + IoWriteFutureImpl future = new IoWriteFutureImpl(buffer); + if (!pendingWrite.compareAndSet(null, future)) { + throw new WritePendingException("No write pending future"); } + doWriteIfPossible(false); return future; } @Override + protected void preClose() { + if (!(packetWriter instanceof Channel)) { + try { + packetWriter.close(); + } catch (IOException e) { + log.error("preClose({}) Failed ({}) to pre-close packet writer: {}", this, e.getClass().getSimpleName(), e.getMessage()); + } + } + + super.preClose(); + } + + @Override protected CloseFuture doCloseGracefully() { return builder().when(pendingWrite.get()).build().close(false); } @@ -114,7 +130,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut try { ChannelAsyncOutputStream stream = this; - IoWriteFuture writeFuture = s.writePacket(buf); + IoWriteFuture writeFuture = packetWriter.writePacket(buf); writeFuture.addListener(new SshFutureListener<IoWriteFuture>() { @Override public void operationComplete(IoWriteFuture f) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java index 6fdb2d1..e52865b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; +import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; @@ -44,6 +45,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe public static final long DEFAULT_WAIT_FOR_SPACE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L); private final AbstractChannel channelInstance; + private final PacketWriter packetWriter; private final Window remoteWindow; private final long maxWaitTimeout; private final Logger log; @@ -62,6 +64,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, long maxWaitTimeout, Logger log, byte cmd, boolean eofOnClose) { this.channelInstance = Objects.requireNonNull(channel, "No channel"); + this.packetWriter = channelInstance.resolveChannelStreamPacketWriter(channel, cmd); this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote window"); ValidateUtils.checkTrue(maxWaitTimeout > 0L, "Non-positive max. wait time: %d", maxWaitTimeout); this.maxWaitTimeout = maxWaitTimeout; @@ -215,7 +218,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe if (log.isTraceEnabled()) { log.trace("flush({}) send {} len={}", channel, SshConstants.getCommandMessageName(cmd), length); } - channel.writePacket(buf); + packetWriter.writePacket(buf); } } catch (WindowClosedException e) { if (!closedState.getAndSet(true)) { @@ -237,17 +240,25 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe @Override public synchronized void close() throws IOException { - if (isOpen()) { - if (log.isTraceEnabled()) { - log.trace("close({}) closing", this); - } + if (!isOpen()) { + return; + } - try { - flush(); + if (log.isTraceEnabled()) { + log.trace("close({}) closing", this); + } - if (isEofOnClose()) { - AbstractChannel channel = getChannel(); - channel.sendEof(); + try { + flush(); + + if (isEofOnClose()) { + AbstractChannel channel = getChannel(); + channel.sendEof(); + } + } finally { + try { + if (!(packetWriter instanceof Channel)) { + packetWriter.close(); } } finally { closedState.set(true); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java new file mode 100644 index 0000000..d5d7821 --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.channel.throttle; + +import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.io.PacketWriter; + +/** + * A special mechanism that enables users to intervene in the way packets are + * sent from {@code ChannelOutputStream}-s - e.g., by introducing throttling + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +@FunctionalInterface +public interface ChannelStreamPacketWriterResolver { + /** + * An identity resolver - i.e., no special intervention - simply use the channel itself + */ + ChannelStreamPacketWriterResolver NONE = (channel, cmd) -> channel; + + /** + * @param channel The original {@link Channel} + * @param cmd The {@code SSH_MSG_CHANNEL_DATA} or {@code SSH_MSG_CHANNEL_EXTENDED_DATA} + * command that triggered the resolution + * @return The {@link PacketWriter} to use - <B>Note:</B> if the return value is + * not a {@link Channel} then it will be closed when the stream is closed + */ + PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd); +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java new file mode 100644 index 0000000..e50eeb8 --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.channel.throttle; + +import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.io.PacketWriter; + +/** + * TODO Add javadoc + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public interface ChannelStreamPacketWriterResolverManager extends ChannelStreamPacketWriterResolver { + ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver(); + + void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver); + + default ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() { + ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver(); + return (resolver == null) ? ChannelStreamPacketWriterResolver.NONE : resolver; + } + + @Override + default PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) { + ChannelStreamPacketWriterResolver resolver = resolveChannelStreamPacketWriterResolver(); + return (resolver == null) ? channel : resolver.resolveChannelStreamPacketWriter(channel, cmd); + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java index 893d145..45e157e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java @@ -177,7 +177,13 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { buffer.putByte((byte) 0x00); buffer.putByte((byte) 0x00); buffer.putByte((byte) 0x00); - session.write(buffer); + try { + session.writePacket(buffer); + } catch (IOException e) { + // TODO Auto-generated catch block + log.error("Failed ({}) to send channel open packet for {}: {}", e.getClass().getSimpleName(), channel, e.getMessage()); + throw new IllegalStateException("Failed to send packet", e); + } } private String getNTString(Buffer buffer) { @@ -214,7 +220,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { buffer = new ByteArrayBuffer(Byte.SIZE, false); buffer.putByte((byte) 0x05); buffer.putByte((byte) (foundNoAuth ? 0x00 : 0xFF)); - session.write(buffer); + session.writePacket(buffer); if (!foundNoAuth) { throw new IllegalStateException("Received socks5 greeting without NoAuth method"); } else { @@ -284,7 +290,13 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { response.putByte((byte) 0x00); } response.wpos(wpos); - session.write(response); + try { + session.writePacket(response); + } catch (IOException e) { + // TODO Auto-generated catch block + log.error("Failed ({}) to send channel open response for {}: {}", e.getClass().getSimpleName(), channel, e.getMessage()); + throw new IllegalStateException("Failed to send packet", e); + } } private String getBLString(Buffer buffer) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java index 98bb61e..1f73590 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java @@ -133,7 +133,7 @@ public class TcpipClientChannel extends AbstractClientChannel { Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len); Window wLocal = getLocalWindow(); wLocal.consumeAndCheck(len); - serverSession.write(buf); + serverSession.writePacket(buf); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java index 93175cf..4dcd0d2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java @@ -40,6 +40,7 @@ import org.apache.sshd.common.SyspropsMapWrapper; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.RequestHandler; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; import org.apache.sshd.common.config.VersionProperties; import org.apache.sshd.common.file.FileSystemFactory; import org.apache.sshd.common.forward.ForwardingFilterFactory; @@ -88,6 +89,7 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i private final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<>(); private PropertyResolver parentResolver = SyspropsMapWrapper.SYSPROPS_RESOLVER; private ReservedSessionMessagesHandler reservedSessionMessagesHandler; + private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver; protected AbstractFactoryManager() { ClassLoader loader = getClass().getClassLoader(); @@ -272,6 +274,16 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i } @Override + public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { + return channelStreamPacketWriterResolver; + } + + @Override + public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) { + channelStreamPacketWriterResolver = resolver; + } + + @Override public void addSessionListener(SessionListener listener) { SessionListener.validateListener(listener); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java index e50a60c..e98e5f0 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java @@ -19,16 +19,7 @@ package org.apache.sshd.common.io; import org.apache.sshd.common.Closeable; -import org.apache.sshd.common.util.buffer.Buffer; - -public interface IoOutputStream extends Closeable { - - /** - * <B>NOTE:</B> the buffer must not be touched until the returned write future is completed. - * - * @param buffer the {@link Buffer} to use - * @return The {@link IoWriteFuture} for the operation - */ - IoWriteFuture write(Buffer buffer); +public interface IoOutputStream extends Closeable, PacketWriter { + // nothing extra } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java index 2861b80..3edb6fa 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java @@ -21,10 +21,8 @@ package org.apache.sshd.common.io; import java.net.SocketAddress; import org.apache.sshd.common.Closeable; -import org.apache.sshd.common.future.CloseFuture; -import org.apache.sshd.common.util.buffer.Buffer; -public interface IoSession extends Closeable { +public interface IoSession extends PacketWriter, Closeable { /** * @return a unique identifier for this session. Every session has its own @@ -61,29 +59,6 @@ public interface IoSession extends Closeable { SocketAddress getLocalAddress(); /** - * Write a packet on the socket. - * Multiple writes can be issued concurrently and will be queued. - * - * @param buffer The {@link Buffer} with the encoded packet data - * @return The {@link IoWriteFuture} for the request - */ - IoWriteFuture write(Buffer buffer); - - /** - * Closes this session immediately or after all queued write requests - * are flushed. This operation is asynchronous. Wait for the returned - * {@link CloseFuture} if you want to wait for the session actually closed. - * - * @param immediately {@code true} to close this session immediately. - * The pending write requests will simply be discarded. - * {@code false} to close this session after all queued - * write requests are flushed. - * @return The generated {@link CloseFuture} - */ - @Override - CloseFuture close(boolean immediately); - - /** * @return the {@link IoService} that created this session. */ IoService getService(); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/PacketWriter.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/PacketWriter.java b/sshd-core/src/main/java/org/apache/sshd/common/io/PacketWriter.java new file mode 100644 index 0000000..42cda19 --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/PacketWriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.io; + +import java.io.IOException; +import java.nio.channels.Channel; + +import org.apache.sshd.common.util.buffer.Buffer; + +/** + * TODO Add javadoc + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public interface PacketWriter extends Channel { + /** + * Encode and send the given buffer. <B>Note:</B> for session packets the buffer has to have + * 5 bytes free at the beginning to allow the encoding to take place. Also, the write position + * of the buffer has to be set to the position of the last byte to write. + * + * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer must not be touched + * until the returned write future is completed. + * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent + * @throws IOException if an error occurred when encoding sending the packet + */ + IoWriteFuture writePacket(Buffer buffer) throws IOException; +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java index d69dc06..02dd74c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java @@ -132,7 +132,7 @@ public class MinaSession extends AbstractInnerCloseable implements IoSession { } @Override // NOTE !!! data buffer may NOT be re-used when method returns - at least until IoWriteFuture is signalled - public IoWriteFuture write(Buffer buffer) { + public IoWriteFuture writePacket(Buffer buffer) { return write(MinaSupport.asIoBuffer(buffer)); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index 1384092..da07efd 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java @@ -130,7 +130,7 @@ public class Nio2Session extends AbstractCloseable implements IoSession { } @Override - public IoWriteFuture write(Buffer buffer) { + public IoWriteFuture writePacket(Buffer buffer) throws IOException { if (log.isDebugEnabled()) { log.debug("Writing {} bytes", buffer.available()); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java index 35bb96d..6e00e26 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java @@ -28,12 +28,14 @@ import org.apache.sshd.common.PropertyResolver; import org.apache.sshd.common.Service; import org.apache.sshd.common.auth.MutableUserHolder; import org.apache.sshd.common.channel.ChannelListenerManager; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; import org.apache.sshd.common.cipher.CipherInformation; import org.apache.sshd.common.compression.CompressionInformation; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; import org.apache.sshd.common.future.KeyExchangeFuture; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.kex.KexFactoryManager; import org.apache.sshd.common.kex.KexProposalOption; import org.apache.sshd.common.kex.KeyExchange; @@ -52,12 +54,14 @@ public interface Session SessionListenerManager, ReservedSessionMessagesManager, ChannelListenerManager, + ChannelStreamPacketWriterResolverManager, PortForwardingEventListenerManager, FactoryManagerHolder, PropertyResolver, AttributeStore, Closeable, - MutableUserHolder { + MutableUserHolder, + PacketWriter { /** * Default prefix expected for the client / server identification string @@ -199,17 +203,6 @@ public interface Session IoWriteFuture sendIgnoreMessage(byte... data) throws IOException; /** - * Encode and send the given buffer. - * The buffer has to have 5 bytes free at the beginning to allow the encoding to take place. - * Also, the write position of the buffer has to be set to the position of the last byte to write. - * - * @param buffer the buffer to encode and send - * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent - * @throws IOException if an error occurred when encoding sending the packet - */ - IoWriteFuture writePacket(Buffer buffer) throws IOException; - - /** * Encode and send the given buffer with the specified timeout. * If the buffer could not be written before the timeout elapses, the returned * {@link org.apache.sshd.common.io.IoWriteFuture} will be set with a http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index ac2b272..5d01de4 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -55,6 +55,8 @@ import org.apache.sshd.common.Service; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.ChannelListener; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; import org.apache.sshd.common.cipher.Cipher; import org.apache.sshd.common.cipher.CipherInformation; import org.apache.sshd.common.compression.Compression; @@ -240,6 +242,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen */ private final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<>(); private ReservedSessionMessagesHandler reservedSessionMessagesHandler; + private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver; /** * Create a new session. @@ -433,6 +436,27 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen signalSessionEvent(SessionListener.Event.Authenticated); } + @Override + public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { + return channelStreamPacketWriterResolver; + } + + @Override + public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) { + channelStreamPacketWriterResolver = resolver; + } + + @Override + public ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() { + ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver(); + if (resolver != null) { + return resolver; + } + + ChannelStreamPacketWriterResolverManager manager = getFactoryManager(); + return manager.resolveChannelStreamPacketWriterResolver(); + } + /** * <P>Main input point for the MINA framework.</P> * @@ -1063,11 +1087,11 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen synchronized (encodeLock) { if (ignoreBuf != null) { ignoreBuf = encode(ignoreBuf); - ioSession.write(ignoreBuf); + ioSession.writePacket(ignoreBuf); } buffer = encode(buffer); - future = ioSession.write(buffer); + future = ioSession.writePacket(buffer); } return future; @@ -1447,13 +1471,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen * @param ident our identification to send * @return {@link IoWriteFuture} that can be used to wait for notification * that identification has been send + * @throws IOException If failed to send the packet */ - protected IoWriteFuture sendIdentification(String ident) { + protected IoWriteFuture sendIdentification(String ident) throws IOException { byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8); if (log.isDebugEnabled()) { log.debug("sendIdentification({}): {}", this, ident.replace('\r', '|').replace('\n', '|')); } - return ioSession.write(new ByteArrayBuffer(data)); + return ioSession.writePacket(new ByteArrayBuffer(data)); } /** http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index 010fa11..cca2eb9 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -286,7 +286,7 @@ public class TcpipServerChannel extends AbstractServerChannel { ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds int boundaries: %d", len); // Make sure we copy the data as the incoming buffer may be reused Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len); - ioSession.write(buf).addListener(future -> { + ioSession.writePacket(buf).addListener(future -> { if (future.isWritten()) { handleWriteDataSuccess(SshConstants.SSH_MSG_CHANNEL_DATA, buf.array(), 0, (int) len); } else { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java index 1385459..1254a14 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java @@ -167,9 +167,10 @@ public abstract class AbstractServerSession extends AbstractSession implements S * identification string - ignored if {@code null}/empty * @return An {@link IoWriteFuture} that can be used to be notified of * identification data being written successfully or failing + * @throws IOException If failed to send identification * @see <A HREF="https://tools.ietf.org/html/rfc4253#section-4.2">RFC 4253 - section 4.2</A> */ - protected IoWriteFuture sendServerIdentification(String... headerLines) { + protected IoWriteFuture sendServerIdentification(String... headerLines) throws IOException { serverVersion = resolveIdentificationString(ServerFactoryManager.SERVER_IDENTIFICATION); String ident = serverVersion; @@ -362,7 +363,7 @@ public abstract class AbstractServerSession extends AbstractSession implements S } if (GenericUtils.length(errorMessage) > 0) { - ioSession.write(new ByteArrayBuffer((errorMessage + "\n").getBytes(StandardCharsets.UTF_8))) + ioSession.writePacket(new ByteArrayBuffer((errorMessage + "\n").getBytes(StandardCharsets.UTF_8))) .addListener(future -> close(true)); throw new SshException(errorMessage); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java index 3d95af7..6f99fed 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java @@ -95,7 +95,7 @@ public class ChannelForwardedX11 extends AbstractClientChannel { Window wLocal = getLocalWindow(); wLocal.consumeAndCheck(len); // use a clone in case data buffer is re-used - serverSession.write(ByteArrayBuffer.getCompactClone(data, off, (int) len)); + serverSession.writePacket(ByteArrayBuffer.getCompactClone(data, off, (int) len)); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java index 1d89ee8..cac8e32 100644 --- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java @@ -231,10 +231,23 @@ public class WindowAdjustTest extends BaseTestSupport { futureHolder.set(service.submit((Runnable) () -> { log.info("Start heavy load sending " + sendCount + " messages of " + msg.length + " bytes"); for (int i = 0; i < sendCount; i++) { - pendingWrapper.write(new ByteArrayBuffer(msg)); + try { + pendingWrapper.write(new ByteArrayBuffer(msg)); + } catch (IOException e) { + log.error("Failed ({}) to send message #{}/{}: {}", + e.getClass().getSimpleName(), i + 1, sendCount, e.getMessage()); + throw new RuntimeException(e); + } } log.info("Sending EOF signal"); - pendingWrapper.write(new ByteArrayBuffer(new byte[]{eofSignal})); + + try { + pendingWrapper.write(new ByteArrayBuffer(new byte[]{eofSignal})); + } catch (IOException e) { + log.error("Failed ({}) to send EOF message after {} messages: {}", + e.getClass().getSimpleName(), sendCount, e.getMessage()); + throw new RuntimeException(e); + } })); log.info("Started"); } @@ -270,9 +283,9 @@ public class WindowAdjustTest extends BaseTestSupport { this.asyncIn = out; } - public synchronized void write(final Object msg) { + public synchronized void write(Object msg) throws IOException { if ((asyncIn != null) && (!asyncIn.isClosed()) && (!asyncIn.isClosing())) { - final Buffer byteBufferMsg = (Buffer) msg; + Buffer byteBufferMsg = (Buffer) msg; if (!pending.isEmpty()) { queueRequest(byteBufferMsg); return; @@ -282,14 +295,19 @@ public class WindowAdjustTest extends BaseTestSupport { } } - private void writeWithPendingDetection(final Buffer msg, final boolean wasPending) { + private void writeWithPendingDetection(Buffer msg, boolean wasPending) throws IOException { try { - asyncIn.write(msg).addListener(future -> { + asyncIn.writePacket(msg).addListener(future -> { if (future.isWritten()) { if (wasPending) { pending.remove(); } - writePendingIfAny(); + + try { + writePendingIfAny(); + } catch (IOException e) { + log.error("Failed ({}) to re-write pending: {}", e.getClass().getSimpleName(), e.getMessage()); + } } else { Throwable t = future.getException(); log.warn("Failed to write message", t); @@ -302,16 +320,16 @@ public class WindowAdjustTest extends BaseTestSupport { } } - private synchronized void writePendingIfAny() { + private synchronized void writePendingIfAny() throws IOException { if (pending.peek() == null) { return; } - final Buffer msg = pending.peek(); + Buffer msg = pending.peek(); writeWithPendingDetection(msg, true); } - private void queueRequest(final Buffer msg) { + private void queueRequest(Buffer msg) { msg.rpos(0); pending.add(msg); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java index 4968282..cbfa56f 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java @@ -77,6 +77,8 @@ import org.apache.sshd.common.channel.TestChannelListener; import org.apache.sshd.common.config.keys.KeyUtils; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; +import org.apache.sshd.common.io.IoInputStream; +import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.io.IoReadFuture; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.io.IoWriteFuture; @@ -509,15 +511,16 @@ public class ClientTest extends BaseTestSupport { ByteArrayOutputStream baosErr = new ByteArrayOutputStream()) { AtomicInteger writes = new AtomicInteger(nbMessages); - channel.getAsyncIn().write(new ByteArrayBuffer(message)).addListener(new SshFutureListener<IoWriteFuture>() { + IoOutputStream asyncIn = channel.getAsyncIn(); + asyncIn.writePacket(new ByteArrayBuffer(message)).addListener(new SshFutureListener<IoWriteFuture>() { @Override public void operationComplete(IoWriteFuture future) { try { if (future.isWritten()) { if (writes.decrementAndGet() > 0) { - channel.getAsyncIn().write(new ByteArrayBuffer(message)).addListener(this); + asyncIn.writePacket(new ByteArrayBuffer(message)).addListener(this); } else { - channel.getAsyncIn().close(false); + asyncIn.close(false); } } else { throw new SshException("Error writing", future.getException()); @@ -529,7 +532,9 @@ public class ClientTest extends BaseTestSupport { } } }); - channel.getAsyncOut().read(new ByteArrayBuffer()).addListener(new SshFutureListener<IoReadFuture>() { + + IoInputStream asyncOut = channel.getAsyncOut(); + asyncOut.read(new ByteArrayBuffer()).addListener(new SshFutureListener<IoReadFuture>() { @Override public void operationComplete(IoReadFuture future) { try { @@ -539,7 +544,7 @@ public class ClientTest extends BaseTestSupport { baosOut.write(buffer.array(), buffer.rpos(), buffer.available()); buffer.rpos(buffer.rpos() + buffer.available()); buffer.compact(); - channel.getAsyncOut().read(buffer).addListener(this); + asyncOut.read(buffer).addListener(this); } catch (IOException e) { if (!channel.isClosing()) { channel.close(true); @@ -547,7 +552,9 @@ public class ClientTest extends BaseTestSupport { } } }); - channel.getAsyncErr().read(new ByteArrayBuffer()).addListener(new SshFutureListener<IoReadFuture>() { + + IoInputStream asyncErr = channel.getAsyncErr(); + asyncErr.read(new ByteArrayBuffer()).addListener(new SshFutureListener<IoReadFuture>() { @Override public void operationComplete(IoReadFuture future) { try { @@ -557,7 +564,7 @@ public class ClientTest extends BaseTestSupport { baosErr.write(buffer.array(), buffer.rpos(), buffer.available()); buffer.rpos(buffer.rpos() + buffer.available()); buffer.compact(); - channel.getAsyncErr().read(buffer).addListener(this); + asyncErr.read(buffer).addListener(this); } catch (IOException e) { if (!channel.isClosing()) { channel.close(true); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java index c06f3c3..93c37be 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java @@ -275,7 +275,7 @@ public class WindowTest extends BaseTestSupport { IoInputStream input = channel.getAsyncOut(); for (int i = 0; i < nbMessages; i++) { Buffer buffer = new ByteArrayBuffer(bytes); - output.write(buffer).verify(5L, TimeUnit.SECONDS); + output.writePacket(buffer).verify(5L, TimeUnit.SECONDS); waitForWindowNotEquals(clientLocal, serverRemote, "client local", "server remote", TimeUnit.SECONDS.toMillis(3L)); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java index bb6b4cd..538f2cd 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java @@ -18,7 +18,9 @@ */ package org.apache.sshd.common.session.helpers; +import java.io.EOFException; import java.io.IOException; +import java.io.WriteAbortedException; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.util.List; @@ -359,12 +361,12 @@ public class AbstractSessionTest extends BaseTestSupport { } @Override - public IoWriteFuture write(Buffer buffer) { + public IoWriteFuture writePacket(Buffer buffer) throws IOException { if (!isOpen()) { - throw new IllegalStateException("Not open"); + throw new EOFException("Not open"); } if (!outgoing.offer(buffer)) { - throw new IllegalStateException("Failed to offer outgoing buffer"); + throw new WriteAbortedException("Failed to offer outgoing buffer", new IllegalStateException("Offer failure")); } IoWriteFutureImpl future = new IoWriteFutureImpl(buffer); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java index ae59e12..509b6b7 100644 --- a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java @@ -130,7 +130,7 @@ public class ServerProxyAcceptorTest extends BaseTestSupport { client.setClientProxyConnector(session -> { IoSession ioSession = session.getIoSession(); - ioSession.write(new ByteArrayBuffer(metaDataBytes)); + ioSession.writePacket(new ByteArrayBuffer(metaDataBytes)); }); client.start(); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java index 7a077a4..49affe7 100644 --- a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java @@ -45,7 +45,7 @@ public class ChannelSessionTest extends BaseTestSupport { */ @Test public void testHandleWindowAdjust() throws Exception { - final Buffer buffer = new ByteArrayBuffer(); + Buffer buffer = new ByteArrayBuffer(); buffer.putInt(1234); try (ChannelSession channelSession = new ChannelSession() { @@ -54,7 +54,7 @@ public class ChannelSessionTest extends BaseTestSupport { wRemote.init(PropertyResolverUtils.toPropertyResolver(Collections.emptyMap())); } }) { - final AtomicBoolean expanded = new AtomicBoolean(false); + AtomicBoolean expanded = new AtomicBoolean(false); channelSession.asyncOut = new ChannelAsyncOutputStream(new BogusChannel(), (byte) 0) { @Override public void onWindowExpanded() throws IOException { @@ -69,7 +69,7 @@ public class ChannelSessionTest extends BaseTestSupport { @Test // see SSHD-652 public void testCloseFutureListenerRegistration() throws Exception { - final AtomicInteger closeCount = new AtomicInteger(); + AtomicInteger closeCount = new AtomicInteger(); try (ChannelSession session = new ChannelSession() { { Window wRemote = getRemoteWindow(); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java index cef9579..b0ff4bb 100644 --- a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java +++ b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java @@ -140,7 +140,7 @@ public class AsyncEchoShellFactory implements Factory<Command> { if (buffer.charAt(i) == '\n') { String s = buffer.substring(0, i + 1); byte[] bytes = s.getBytes(StandardCharsets.UTF_8); - out.write(new ByteArrayBuffer(bytes)).addListener(future -> { + out.writePacket(new ByteArrayBuffer(bytes)).addListener(future -> { Session session1 = channel.getSession(); if (future.isWritten()) { try { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java index 0ef7bed..96bb36d 100644 --- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java +++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java @@ -23,6 +23,9 @@ import java.io.IOException; import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.channel.AbstractChannel; +import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; +import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.util.buffer.Buffer; public class BogusChannel extends AbstractChannel { @@ -59,4 +62,14 @@ public class BogusChannel extends AbstractChannel { public void handleOpenFailure(Buffer buffer) throws IOException { // ignored } + + @Override + public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { + return ChannelStreamPacketWriterResolver.NONE; + } + + @Override + public PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) { + return channel; + } }