Repository: mina-sshd
Updated Branches:
  refs/heads/master 1f3006b93 -> e0a6b8da2


[SSHD-768] Introduced PacketWriter + ChannelStreamPacketWriterResolver + 
ThrottlingPacketWriter hooks for user intervention


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e0a6b8da
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e0a6b8da
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e0a6b8da

Branch: refs/heads/master
Commit: e0a6b8da2b9b6af2fa6ff23fdcd76188ab1db2fe
Parents: 1f3006b
Author: Goldstein Lyor <l...@c-b4.com>
Authored: Wed Sep 6 09:22:21 2017 +0300
Committer: Goldstein Lyor <l...@c-b4.com>
Committed: Thu Sep 7 08:51:17 2017 +0300

----------------------------------------------------------------------
 README.md                                       |  14 ++
 .../throttle/ThrottlingPacketWriter.java        | 217 +++++++++++++++++++
 .../throttle/ThrottlingPacketWriterTest.java    | 129 +++++++++++
 .../org/apache/sshd/common/BaseBuilder.java     |  10 +-
 .../org/apache/sshd/common/FactoryManager.java  |   2 +
 .../sshd/common/channel/AbstractChannel.java    |  29 ++-
 .../common/channel/BufferedIoOutputStream.java  |  24 +-
 .../org/apache/sshd/common/channel/Channel.java |   4 +
 .../channel/ChannelAsyncOutputStream.java       |  36 ++-
 .../common/channel/ChannelOutputStream.java     |  31 ++-
 .../ChannelStreamPacketWriterResolver.java      |  45 ++++
 ...hannelStreamPacketWriterResolverManager.java |  44 ++++
 .../apache/sshd/common/forward/SocksProxy.java  |  18 +-
 .../sshd/common/forward/TcpipClientChannel.java |   2 +-
 .../common/helpers/AbstractFactoryManager.java  |  12 +
 .../apache/sshd/common/io/IoOutputStream.java   |  13 +-
 .../org/apache/sshd/common/io/IoSession.java    |  27 +--
 .../org/apache/sshd/common/io/PacketWriter.java |  43 ++++
 .../apache/sshd/common/io/mina/MinaSession.java |   2 +-
 .../apache/sshd/common/io/nio2/Nio2Session.java |   2 +-
 .../org/apache/sshd/common/session/Session.java |  17 +-
 .../common/session/helpers/AbstractSession.java |  33 ++-
 .../sshd/server/forward/TcpipServerChannel.java |   2 +-
 .../server/session/AbstractServerSession.java   |   5 +-
 .../sshd/server/x11/ChannelForwardedX11.java    |   2 +-
 .../java/org/apache/sshd/WindowAdjustTest.java  |  38 +++-
 .../java/org/apache/sshd/client/ClientTest.java |  21 +-
 .../apache/sshd/common/channel/WindowTest.java  |   2 +-
 .../session/helpers/AbstractSessionTest.java    |   8 +-
 .../sshd/server/ServerProxyAcceptorTest.java    |   2 +-
 .../sshd/server/channel/ChannelSessionTest.java |   6 +-
 .../sshd/util/test/AsyncEchoShellFactory.java   |   2 +-
 .../org/apache/sshd/util/test/BogusChannel.java |  13 ++
 33 files changed, 735 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index de1e2dd..0f53709 100644
--- a/README.md
+++ b/README.md
@@ -482,6 +482,17 @@ injecting some dynamic data before the command is 
`start()`-ed.
 associated with this command.
 
 
+### Data stream(s) sizing consideration
+
+Some commands may send/receive large amounts of data over their 
STDIN/STDOUT/STDERR streams. Since (by default) the sending mechanism in SSHD is
+**asynchronous** it may cause _Out of memory_ errors due to one side 
(client/server) generating `SSH_MSG_CHANNEL_DATA` or 
`SSH_MSG_CHANNEL_EXTENDED_DATA`
+at a much higher rate than the other side can consume. This leads to a 
build-up of a packets backlog that eventually consumes all available memory
+(as described in [SSHD-754](https://issues.apache.org/jira/browse/SSHD-754) 
and [SSHD-768](https://issues.apache.org/jira/browse/SSHD-768)). As of
+version 1.7 one can register a `ChannelStreamPacketWriterResolver` at the 
client/server/session/channel level that can enable the user to replace
+the raw channel with some throttling mechanism that will be used for stream 
packets. Such an (experimental) example is the `ThrottlingPacketWriter`
+available in the `sshd-contrib` module. **Note:** if the 
`ChannelStreamPacketWriterResolver` returns a wrapper instance instead of a 
`Channel` then
+it will be **closed** automatically when the stream using it is closed.
+
 ## SCP
 
 Besides the `ScpTransferEventListener`, the SCP module also uses a 
`ScpFileOpener` instance in order to access
@@ -1220,6 +1231,9 @@ reading data - and those that modify it
 * `ProxyProtocolAcceptor` - A working prototype to support the PROXY protocol 
as described in [HAProxy 
Documentation](http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt)
 
 
+* `ThrottlingPacketWriter` - An example of a way to overcome big window sizes 
when sending data - as described in 
[SSHD-754](https://issues.apache.org/jira/browse/SSHD-754)
+and [SSHD-768](https://issues.apache.org/jira/browse/SSHD-768)
+
 # Builtin components
 
 Below is the list of builtin components:

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java
----------------------------------------------------------------------
diff --git 
a/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java
 
b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java
new file mode 100644
index 0000000..3efcc85
--- /dev/null
+++ 
b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel.throttle;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.InterruptedByTimeoutException;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sshd.common.PropertyResolver;
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+
+/**
+ * A {@link PacketWriter} delegate implementation that &quot;throttles&quot; 
the
+ * output by having a limit on the outstanding packets that have not been sent
+ * yet. The {@link #writePacket(Buffer) writePacket} implementation make sure
+ * that the limit has not been exceeded - if so, then it waits until pending
+ * packets have been successfully sent before sending the next packet.
+ *
+ * <B>Note:</B> {@link #close() closing} the throttler does not close the 
delegate writer
+ *
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+public class ThrottlingPacketWriter extends AbstractLoggingBean implements 
PacketWriter, SshFutureListener<IoWriteFuture> {
+    /** Timeout (seconds) for throttling packet writer to wait for pending 
packets send */
+    public static final String WAIT_TIME_PROP = "packet-writer-wait-time";
+
+    /** Default value for {@value #WAIT_TIME_PROP} if none specified */
+    public static final long DEFAULT_MAX_WAIT_TIME = 30L;
+
+    /** Max. pending packets count */
+    public static final String MAX_PEND_COUNT = "packet-writer-max-pend-count";
+
+    /** Default value for {@value #MAX_PEND_COUNT} if none specified */
+    public static final int DEFAULT_PEND_COUNT_MAX = 4096;
+
+    private final boolean traceEnabled;
+    private final PacketWriter delegate;
+    private final int maxPendingPackets;
+    private final long maxWait;
+    private final AtomicBoolean open = new AtomicBoolean(true);
+    private final AtomicInteger availableCount;
+
+    public ThrottlingPacketWriter(Channel channel) {
+        this(channel, channel);
+    }
+
+    public ThrottlingPacketWriter(PacketWriter delegate, PropertyResolver 
resolver) {
+        this(delegate, PropertyResolverUtils.getIntProperty(resolver, 
MAX_PEND_COUNT, DEFAULT_PEND_COUNT_MAX),
+            TimeUnit.SECONDS, PropertyResolverUtils.getLongProperty(resolver, 
WAIT_TIME_PROP, DEFAULT_MAX_WAIT_TIME));
+    }
+
+    public ThrottlingPacketWriter(PacketWriter delegate, int 
maxPendingPackets, TimeUnit waitUnit, long waitCount) {
+        this(delegate, maxPendingPackets, waitUnit.toMillis(waitCount));
+    }
+
+    public ThrottlingPacketWriter(PacketWriter delegate, int 
maxPendingPackets, long maxWait) {
+        this.delegate = Objects.requireNonNull(delegate, "No delegate 
provided");
+        ValidateUtils.checkTrue(maxPendingPackets > 0, "Invalid pending 
packets limit: %d", maxPendingPackets);
+        this.maxPendingPackets = maxPendingPackets;
+        this.availableCount =  new AtomicInteger(maxPendingPackets);
+        ValidateUtils.checkTrue(maxWait > 0L, "Invalid max. pending wait time: 
%d", maxWait);
+        this.maxWait = maxWait;
+        this.traceEnabled = log.isTraceEnabled();
+    }
+
+    public PacketWriter getDelegate() {
+        return delegate;
+    }
+
+    public int getMaxPendingPackets() {
+        return maxPendingPackets;
+    }
+
+    public int getAvailablePacketsCount() {
+        return availableCount.get();
+    }
+
+    public long getMaxWait() {
+        return maxWait;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open.get();
+    }
+
+    @Override
+    public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+        if (!isOpen()) {
+            throw new ClosedSelectorException();
+        }
+
+        long remainWait = getMaxWait();
+        int available;
+        synchronized (availableCount) {
+            while (availableCount.get() == 0) {
+                long waitStart = System.currentTimeMillis();
+                try {
+                    availableCount.wait(remainWait);
+                } catch (InterruptedException e) {
+                    throw new InterruptedIOException("Interrupted after " + 
(System.currentTimeMillis() - waitStart) + " msec.");
+                }
+                long waitDuration = System.currentTimeMillis() - waitStart;
+                if (waitDuration <= 0L) {
+                    waitDuration = 1L;
+                }
+                remainWait -= waitDuration;
+                if (remainWait <= 0L) {
+                    throw new InterruptedByTimeoutException();
+                }
+            }
+
+            available = availableCount.decrementAndGet();
+        }
+
+        if (traceEnabled) {
+            log.trace("writePacket({}) available={} after {} msec.", this, 
available, getMaxWait() - remainWait);
+        }
+        if (available < 0) {
+            throw new EOFException("Negative available packets count: " + 
available);
+        }
+
+        PacketWriter writer = getDelegate();
+        return writer.writePacket(buffer).addListener(this);
+    }
+
+    @Override
+    public void operationComplete(IoWriteFuture future) {
+        if (future.isDone()) {
+            if (future.isWritten()) {
+                int available;
+                synchronized (availableCount) {
+                    available = isOpen() ? availableCount.incrementAndGet() : 
Integer.MIN_VALUE;
+                    availableCount.notifyAll();
+                }
+
+                if (available > 0) {
+                    if (traceEnabled) {
+                        log.trace("operationComplete({}) available={}", this, 
available);
+                    }
+                    return;
+                }
+
+                /*
+                 * If non-positive it may be that close has been signaled or 
mis-count - in any case, don't take any chances
+                 */
+                log.error("operationComplete({}) invalid available count: {}", 
this, available);
+            } else {
+                Throwable err = future.getException();
+                log.error("operationComplete({}) Error ({}) signalled: {}", 
this, err.getClass().getSimpleName(), err.getMessage());
+            }
+        } else {
+            log.error("operationComplete({}) Incomplete future signalled: {}", 
this, future);
+        }
+
+        try {
+            close();
+        } catch (IOException e) {
+            log.warn("operationComplete({}) unexpected ({}) due to close: {}",
+                    this, e.getClass().getSimpleName(), e.getMessage());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (open.getAndSet(false)) {
+            if (log.isDebugEnabled()) {
+                log.debug("close({}) closing");
+            }
+        }
+
+        // Do it again if called - no harm
+        synchronized (availableCount) {
+            availableCount.set(-1);
+            availableCount.notifyAll();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName()
+            + "[delegate=" + getDelegate()
+            + ", maxWait=" + getMaxWait()
+            + ", maxPending=" + getMaxPendingPackets()
+            + ", available=" + getAvailablePacketsCount()
+            + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java
 
b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java
new file mode 100644
index 0000000..94958c8
--- /dev/null
+++ 
b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel.throttle;
+
+import java.io.IOException;
+import java.io.StreamCorruptedException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.InterruptedByTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.common.channel.IoWriteFutureImpl;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ThrottlingPacketWriterTest extends BaseTestSupport {
+    public ThrottlingPacketWriterTest() {
+        super();
+    }
+
+    @Test
+    public void testThrottlerWaitsUntilPacketSendSignalled() throws 
IOException {
+        try (ThrottlingPacketWriter throttler = new ThrottlingPacketWriter(new 
MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
+            int maxSize = throttler.getMaxPendingPackets();
+            List<IoWriteFuture> pendingWrites = new ArrayList<>(maxSize);
+            Buffer buf = new ByteArrayBuffer(Byte.SIZE);
+            for (int index = maxSize; index > 0; index--) {
+                IoWriteFuture future = throttler.writePacket(buf);
+                pendingWrites.add(future);
+                assertEquals("Mismatched available packets count", index - 1, 
throttler.getAvailablePacketsCount());
+            }
+
+            assertEquals("Not all available packet window size exhausted", 0, 
throttler.getAvailablePacketsCount());
+            try {
+                IoWriteFuture future = throttler.writePacket(buf);
+                fail("Unexpected extra packet success: " + future);
+            } catch (InterruptedByTimeoutException e) {
+                // expected
+            }
+
+            int sendSize = pendingWrites.size() / 2;
+            for (int index = 0; index < sendSize; index++) {
+                IoWriteFutureImpl future = (IoWriteFutureImpl) 
pendingWrites.get(index);
+                future.setValue(Boolean.TRUE);
+                assertEquals("Mismatched available packets count", index + 1, 
throttler.getAvailablePacketsCount());
+            }
+
+            for (int index = throttler.getAvailablePacketsCount(); index < 
maxSize; index++) {
+                throttler.writePacket(buf);
+            }
+        }
+    }
+
+    @Test(expected = ClosedSelectorException.class)
+    public void testThrottlerDoesNotSendIfClosed() throws IOException {
+        try (PacketWriter throttler = new ThrottlingPacketWriter(new 
MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
+            assertTrue("Throttler not marked as open", throttler.isOpen());
+            throttler.close();
+            assertFalse("Throttler not marked as closed", throttler.isOpen());
+
+            IoWriteFuture future = throttler.writePacket(new 
ByteArrayBuffer(Byte.SIZE));
+            fail("Unexpected success: " + future);
+        }
+    }
+
+    @Test(expected = ClosedSelectorException.class)
+    public void 
testThrottlerStopsSendingIfExceptionSignaledOnFutureOperationCompletion() 
throws IOException {
+        try (PacketWriter throttler = new ThrottlingPacketWriter(new 
MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
+            assertTrue("Throttler not marked as open", throttler.isOpen());
+
+            IoWriteFutureImpl futureImpl = (IoWriteFutureImpl) 
throttler.writePacket(new ByteArrayBuffer(Byte.SIZE));
+            futureImpl.setValue(new 
StreamCorruptedException(getCurrentTestName()));
+            assertFalse("Throttler not marked as closed", throttler.isOpen());
+
+            IoWriteFuture future = throttler.writePacket(new 
ByteArrayBuffer(Byte.SIZE));
+            fail("Unexpected success: " + future);
+        }
+    }
+
+    private static class MockPacketWriter implements PacketWriter {
+        MockPacketWriter() {
+            super();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return true;
+        }
+
+        @Override
+        public void close() throws IOException {
+            throw new UnsupportedOperationException("Unexpected close call");
+        }
+
+        @Override
+        public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+            return new IoWriteFutureImpl(buffer);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java 
b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
index 250dc63..d7b1732 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.RequestHandler;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
 import org.apache.sshd.common.cipher.BuiltinCiphers;
 import org.apache.sshd.common.cipher.Cipher;
 import org.apache.sshd.common.compression.Compression;
@@ -141,6 +142,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, 
S extends BaseBuilder
     protected ForwardingFilterFactory forwarderFactory;
     protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
     protected ForwardingFilter forwardingFilter;
+    protected ChannelStreamPacketWriterResolver 
channelStreamPacketWriterResolver;
 
     public BaseBuilder() {
         super();
@@ -238,7 +240,12 @@ public class BaseBuilder<T extends AbstractFactoryManager, 
S extends BaseBuilder
         return me();
     }
 
-    public T build(final boolean isFillWithDefaultValues) {
+    public S 
channelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) {
+        channelStreamPacketWriterResolver = resolver;
+        return me();
+    }
+
+    public T build(boolean isFillWithDefaultValues) {
         if (isFillWithDefaultValues) {
             fillWithDefaultValues();
         }
@@ -256,6 +263,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, 
S extends BaseBuilder
         ssh.setForwardingFilter(forwardingFilter);
         ssh.setForwarderFactory(forwarderFactory);
         ssh.setGlobalRequestHandlers(globalRequestHandlers);
+        
ssh.setChannelStreamPacketWriterResolver(channelStreamPacketWriterResolver);
         return ssh;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java 
b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 71e06da..3aef39b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -26,6 +26,7 @@ import org.apache.sshd.agent.SshAgentFactory;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelListenerManager;
 import org.apache.sshd.common.channel.RequestHandler;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
 import org.apache.sshd.common.file.FileSystemFactory;
 import org.apache.sshd.common.forward.ForwardingFilterFactory;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
@@ -51,6 +52,7 @@ public interface FactoryManager
                 SessionListenerManager,
                 ReservedSessionMessagesManager,
                 ChannelListenerManager,
+                ChannelStreamPacketWriterResolverManager,
                 PortForwardingEventListenerManager,
                 AttributeStore,
                 PropertyResolver {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 8f32a2b..2a6c0e9 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -40,6 +40,8 @@ import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.PropertyResolver;
 import org.apache.sshd.common.SshConstants;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
@@ -98,6 +100,8 @@ public abstract class AbstractChannel
 
     private final Window localWindow;
     private final Window remoteWindow;
+    private ChannelStreamPacketWriterResolver 
channelStreamPacketWriterResolver;
+
     /**
      * A {@link Map} of sent requests - key = request name, value = timestamp 
when
      * request was sent.
@@ -198,6 +202,27 @@ public abstract class AbstractChannel
         shutdownExecutor = shutdown;
     }
 
+    @Override
+    public ChannelStreamPacketWriterResolver 
getChannelStreamPacketWriterResolver() {
+        return channelStreamPacketWriterResolver;
+    }
+
+    @Override
+    public void 
setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver 
resolver) {
+        channelStreamPacketWriterResolver = resolver;
+    }
+
+    @Override
+    public ChannelStreamPacketWriterResolver 
resolveChannelStreamPacketWriterResolver() {
+        ChannelStreamPacketWriterResolver resolver = 
getChannelStreamPacketWriterResolver();
+        if (resolver != null) {
+            return resolver;
+        }
+
+        ChannelStreamPacketWriterResolverManager manager = getSession();
+        return manager.resolveChannelStreamPacketWriterResolver();
+    }
+
     /**
      * Add a channel request to the tracked pending ones if reply is expected
      *
@@ -760,7 +785,8 @@ public abstract class AbstractChannel
         super.doCloseImmediately();
     }
 
-    protected IoWriteFuture writePacket(Buffer buffer) throws IOException {
+    @Override
+    public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         if (!isClosing()) {
             Session s = getSession();
             return s.writePacket(buffer);
@@ -880,6 +906,7 @@ public abstract class AbstractChannel
 
         Window wRemote = getRemoteWindow();
         wRemote.expand(window);
+        notifyStateChanged("SSH_MSG_CHANNEL_WINDOW_ADJUST");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
index 6bacdc8..6737a82 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sshd.common.channel;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -43,22 +44,22 @@ public class BufferedIoOutputStream extends 
AbstractInnerCloseable implements Io
     }
 
     @Override
-    public IoWriteFuture write(Buffer buffer) {
-        IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);
+    public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         if (isClosing()) {
-            future.setValue(new IOException("Closed"));
-        } else {
-            writes.add(future);
-            startWriting();
+            throw new EOFException("Closed");
         }
+
+        IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);
+        writes.add(future);
+        startWriting();
         return future;
     }
 
-    protected void startWriting() {
+    protected void startWriting() throws IOException {
         final IoWriteFutureImpl future = writes.peek();
         if (future != null) {
             if (currentWrite.compareAndSet(null, future)) {
-                out.write(future.getBuffer()).addListener(new 
SshFutureListener<IoWriteFuture>() {
+                out.writePacket(future.getBuffer()).addListener(new 
SshFutureListener<IoWriteFuture>() {
                     @Override
                     public void operationComplete(IoWriteFuture f) {
                         if (f.isWritten()) {
@@ -69,10 +70,15 @@ public class BufferedIoOutputStream extends 
AbstractInnerCloseable implements Io
                         finishWrite();
                     }
 
+                    @SuppressWarnings("synthetic-access")
                     private void finishWrite() {
                         writes.remove(future);
                         currentWrite.compareAndSet(future, null);
-                        startWriting();
+                        try {
+                            startWriting();
+                        } catch (IOException e) {
+                            log.error("finishWrite({}) failed ({}) re-start 
writing", out, e.getClass().getSimpleName());
+                        }
                     }
                 });
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
index fa5e226..2061d26 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
@@ -26,6 +26,8 @@ import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.AttributeStore;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.PropertyResolver;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.io.PacketWriter;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.GenericUtils;
@@ -41,6 +43,8 @@ public interface Channel
         extends ChannelListenerManager,
                 PropertyResolver,
                 AttributeStore,
+                PacketWriter,
+                ChannelStreamPacketWriterResolverManager,
                 Closeable {
     // Known types of channels
     String CHANNEL_EXEC = "exec";

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index e0da5c0..e525355 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sshd.common.channel;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
@@ -27,19 +28,21 @@ import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.PacketWriter;
 import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
 
 public class ChannelAsyncOutputStream extends AbstractCloseable implements 
IoOutputStream, ChannelHolder {
-
     private final Channel channelInstance;
+    private final PacketWriter packetWriter;
     private final byte cmd;
     private final AtomicReference<IoWriteFutureImpl> pendingWrite = new 
AtomicReference<>();
 
     public ChannelAsyncOutputStream(Channel channel, byte cmd) {
         this.channelInstance = Objects.requireNonNull(channel, "No channel");
+        this.packetWriter = 
channelInstance.resolveChannelStreamPacketWriter(channel, cmd);
         this.cmd = cmd;
     }
 
@@ -53,20 +56,33 @@ public class ChannelAsyncOutputStream extends 
AbstractCloseable implements IoOut
     }
 
     @Override
-    public synchronized IoWriteFuture write(Buffer buffer) {
-        IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);
+    public synchronized IoWriteFuture writePacket(Buffer buffer) throws 
IOException {
         if (isClosing()) {
-            future.setValue(new IOException("Closed"));
-        } else {
-            if (!pendingWrite.compareAndSet(null, future)) {
-                throw new WritePendingException("No write pending future");
-            }
-            doWriteIfPossible(false);
+            throw new EOFException("Closed");
+        }
+
+        IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);
+        if (!pendingWrite.compareAndSet(null, future)) {
+            throw new WritePendingException("No write pending future");
         }
+        doWriteIfPossible(false);
         return future;
     }
 
     @Override
+    protected void preClose() {
+        if (!(packetWriter instanceof Channel)) {
+            try {
+                packetWriter.close();
+            } catch (IOException e) {
+                log.error("preClose({}) Failed ({}) to pre-close packet 
writer: {}", this, e.getClass().getSimpleName(), e.getMessage());
+            }
+        }
+
+        super.preClose();
+    }
+
+    @Override
     protected CloseFuture doCloseGracefully() {
         return builder().when(pendingWrite.get()).build().close(false);
     }
@@ -114,7 +130,7 @@ public class ChannelAsyncOutputStream extends 
AbstractCloseable implements IoOut
 
                 try {
                     ChannelAsyncOutputStream stream = this;
-                    IoWriteFuture writeFuture = s.writePacket(buf);
+                    IoWriteFuture writeFuture = packetWriter.writePacket(buf);
                     writeFuture.addListener(new 
SshFutureListener<IoWriteFuture>() {
                         @Override
                         public void operationComplete(IoWriteFuture f) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 6fdb2d1..e52865b 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.io.PacketWriter;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -44,6 +45,7 @@ public class ChannelOutputStream extends OutputStream 
implements java.nio.channe
     public static final long DEFAULT_WAIT_FOR_SPACE_TIMEOUT = 
TimeUnit.SECONDS.toMillis(30L);
 
     private final AbstractChannel channelInstance;
+    private final PacketWriter packetWriter;
     private final Window remoteWindow;
     private final long maxWaitTimeout;
     private final Logger log;
@@ -62,6 +64,7 @@ public class ChannelOutputStream extends OutputStream 
implements java.nio.channe
 
     public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, 
long maxWaitTimeout, Logger log, byte cmd, boolean eofOnClose) {
         this.channelInstance = Objects.requireNonNull(channel, "No channel");
+        this.packetWriter = 
channelInstance.resolveChannelStreamPacketWriter(channel, cmd);
         this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote 
window");
         ValidateUtils.checkTrue(maxWaitTimeout > 0L, "Non-positive max. wait 
time: %d", maxWaitTimeout);
         this.maxWaitTimeout = maxWaitTimeout;
@@ -215,7 +218,7 @@ public class ChannelOutputStream extends OutputStream 
implements java.nio.channe
                 if (log.isTraceEnabled()) {
                     log.trace("flush({}) send {} len={}", channel, 
SshConstants.getCommandMessageName(cmd), length);
                 }
-                channel.writePacket(buf);
+                packetWriter.writePacket(buf);
             }
         } catch (WindowClosedException e) {
             if (!closedState.getAndSet(true)) {
@@ -237,17 +240,25 @@ public class ChannelOutputStream extends OutputStream 
implements java.nio.channe
 
     @Override
     public synchronized void close() throws IOException {
-        if (isOpen()) {
-            if (log.isTraceEnabled()) {
-                log.trace("close({}) closing", this);
-            }
+        if (!isOpen()) {
+            return;
+        }
 
-            try {
-                flush();
+        if (log.isTraceEnabled()) {
+            log.trace("close({}) closing", this);
+        }
 
-                if (isEofOnClose()) {
-                    AbstractChannel channel = getChannel();
-                    channel.sendEof();
+        try {
+            flush();
+
+            if (isEofOnClose()) {
+                AbstractChannel channel = getChannel();
+                channel.sendEof();
+            }
+        } finally {
+            try {
+                if (!(packetWriter instanceof Channel)) {
+                    packetWriter.close();
                 }
             } finally {
                 closedState.set(true);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java
new file mode 100644
index 0000000..d5d7821
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel.throttle;
+
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.io.PacketWriter;
+
+/**
+ * A special mechanism that enables users to intervene in the way packets are
+ * sent from {@code ChannelOutputStream}-s - e.g., by introducing throttling
+ *
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+@FunctionalInterface
+public interface ChannelStreamPacketWriterResolver {
+    /**
+     * An identity resolver - i.e., no special intervention - simply use the 
channel itself
+     */
+    ChannelStreamPacketWriterResolver NONE = (channel, cmd) -> channel;
+
+    /**
+     * @param channel The original {@link Channel}
+     * @param cmd The {@code SSH_MSG_CHANNEL_DATA} or {@code 
SSH_MSG_CHANNEL_EXTENDED_DATA}
+     * command that triggered the resolution
+     * @return The {@link PacketWriter} to use - <B>Note:</B> if the return 
value is
+     * not a {@link Channel} then it will be closed when the stream is closed
+     */
+    PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd);
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
new file mode 100644
index 0000000..e50eeb8
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel.throttle;
+
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.io.PacketWriter;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+public interface ChannelStreamPacketWriterResolverManager extends 
ChannelStreamPacketWriterResolver {
+    ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver();
+
+    void 
setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver 
resolver);
+
+    default ChannelStreamPacketWriterResolver 
resolveChannelStreamPacketWriterResolver() {
+        ChannelStreamPacketWriterResolver resolver = 
getChannelStreamPacketWriterResolver();
+        return (resolver == null) ? ChannelStreamPacketWriterResolver.NONE : 
resolver;
+    }
+
+    @Override
+    default PacketWriter resolveChannelStreamPacketWriter(Channel channel, 
byte cmd) {
+        ChannelStreamPacketWriterResolver resolver = 
resolveChannelStreamPacketWriterResolver();
+        return (resolver == null) ? channel : 
resolver.resolveChannelStreamPacketWriter(channel, cmd);
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
index 893d145..45e157e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -177,7 +177,13 @@ public class SocksProxy extends AbstractCloseable 
implements IoHandler {
             buffer.putByte((byte) 0x00);
             buffer.putByte((byte) 0x00);
             buffer.putByte((byte) 0x00);
-            session.write(buffer);
+            try {
+                session.writePacket(buffer);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                log.error("Failed ({}) to send channel open packet for {}: 
{}", e.getClass().getSimpleName(), channel, e.getMessage());
+                throw new IllegalStateException("Failed to send packet", e);
+            }
         }
 
         private String getNTString(Buffer buffer) {
@@ -214,7 +220,7 @@ public class SocksProxy extends AbstractCloseable 
implements IoHandler {
                 buffer = new ByteArrayBuffer(Byte.SIZE, false);
                 buffer.putByte((byte) 0x05);
                 buffer.putByte((byte) (foundNoAuth ? 0x00 : 0xFF));
-                session.write(buffer);
+                session.writePacket(buffer);
                 if (!foundNoAuth) {
                     throw new IllegalStateException("Received socks5 greeting 
without NoAuth method");
                 } else {
@@ -284,7 +290,13 @@ public class SocksProxy extends AbstractCloseable 
implements IoHandler {
                 response.putByte((byte) 0x00);
             }
             response.wpos(wpos);
-            session.write(response);
+            try {
+                session.writePacket(response);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                log.error("Failed ({}) to send channel open response for {}: 
{}", e.getClass().getSimpleName(), channel, e.getMessage());
+                throw new IllegalStateException("Failed to send packet", e);
+            }
         }
 
         private String getBLString(Buffer buffer) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index 98bb61e..1f73590 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -133,7 +133,7 @@ public class TcpipClientChannel extends 
AbstractClientChannel {
         Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len);
         Window wLocal = getLocalWindow();
         wLocal.consumeAndCheck(len);
-        serverSession.write(buf);
+        serverSession.writePacket(buf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
index 93175cf..4dcd0d2 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
@@ -40,6 +40,7 @@ import org.apache.sshd.common.SyspropsMapWrapper;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.RequestHandler;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
 import org.apache.sshd.common.config.VersionProperties;
 import org.apache.sshd.common.file.FileSystemFactory;
 import org.apache.sshd.common.forward.ForwardingFilterFactory;
@@ -88,6 +89,7 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
     private final Map<AttributeKey<?>, Object> attributes = new 
ConcurrentHashMap<>();
     private PropertyResolver parentResolver = 
SyspropsMapWrapper.SYSPROPS_RESOLVER;
     private ReservedSessionMessagesHandler reservedSessionMessagesHandler;
+    private ChannelStreamPacketWriterResolver 
channelStreamPacketWriterResolver;
 
     protected AbstractFactoryManager() {
         ClassLoader loader = getClass().getClassLoader();
@@ -272,6 +274,16 @@ public abstract class AbstractFactoryManager extends 
AbstractKexFactoryManager i
     }
 
     @Override
+    public ChannelStreamPacketWriterResolver 
getChannelStreamPacketWriterResolver() {
+        return channelStreamPacketWriterResolver;
+    }
+
+    @Override
+    public void 
setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver 
resolver) {
+        channelStreamPacketWriterResolver = resolver;
+    }
+
+    @Override
     public void addSessionListener(SessionListener listener) {
         SessionListener.validateListener(listener);
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java 
b/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
index e50a60c..e98e5f0 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
@@ -19,16 +19,7 @@
 package org.apache.sshd.common.io;
 
 import org.apache.sshd.common.Closeable;
-import org.apache.sshd.common.util.buffer.Buffer;
-
-public interface IoOutputStream extends Closeable {
-
-    /**
-     * <B>NOTE:</B> the buffer must not be touched until the returned write 
future is completed.
-     *
-     * @param buffer the {@link Buffer} to use
-     * @return The {@link IoWriteFuture} for the operation
-     */
-    IoWriteFuture write(Buffer buffer);
 
+public interface IoOutputStream extends Closeable, PacketWriter {
+    // nothing extra
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java 
b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
index 2861b80..3edb6fa 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -21,10 +21,8 @@ package org.apache.sshd.common.io;
 import java.net.SocketAddress;
 
 import org.apache.sshd.common.Closeable;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.util.buffer.Buffer;
 
-public interface IoSession extends Closeable {
+public interface IoSession extends PacketWriter, Closeable {
 
     /**
      * @return a unique identifier for this session.  Every session has its own
@@ -61,29 +59,6 @@ public interface IoSession extends Closeable {
     SocketAddress getLocalAddress();
 
     /**
-     * Write a packet on the socket.
-     * Multiple writes can be issued concurrently and will be queued.
-     *
-     * @param buffer The {@link Buffer} with the encoded packet data
-     * @return The {@link IoWriteFuture} for the request
-     */
-    IoWriteFuture write(Buffer buffer);
-
-    /**
-     * Closes this session immediately or after all queued write requests
-     * are flushed.  This operation is asynchronous.  Wait for the returned
-     * {@link CloseFuture} if you want to wait for the session actually closed.
-     *
-     * @param immediately {@code true} to close this session immediately.
-     *                    The pending write requests will simply be discarded.
-     *                    {@code false} to close this session after all queued
-     *                    write requests are flushed.
-     * @return The generated {@link CloseFuture}
-     */
-    @Override
-    CloseFuture close(boolean immediately);
-
-    /**
      * @return the {@link IoService} that created this session.
      */
     IoService getService();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/PacketWriter.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/io/PacketWriter.java 
b/sshd-core/src/main/java/org/apache/sshd/common/io/PacketWriter.java
new file mode 100644
index 0000000..42cda19
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/PacketWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
+ */
+public interface PacketWriter extends Channel {
+    /**
+     * Encode and send the given buffer. <B>Note:</B> for session packets the 
buffer has to have
+     * 5 bytes free at the beginning to allow the encoding to take place. 
Also, the write position
+     * of the buffer has to be set to the position of the last byte to write.
+     *
+     * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer 
must not be touched
+     * until the returned write future is completed.
+     * @return An {@code IoWriteFuture} that can be used to check when the 
packet has actually been sent
+     * @throws IOException if an error occurred when encoding sending the 
packet
+     */
+    IoWriteFuture writePacket(Buffer buffer) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java 
b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
index d69dc06..02dd74c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
@@ -132,7 +132,7 @@ public class MinaSession extends AbstractInnerCloseable 
implements IoSession {
     }
 
     @Override // NOTE !!! data buffer may NOT be re-used when method returns - 
at least until IoWriteFuture is signalled
-    public IoWriteFuture write(Buffer buffer) {
+    public IoWriteFuture writePacket(Buffer buffer) {
         return write(MinaSupport.asIoBuffer(buffer));
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java 
b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 1384092..da07efd 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -130,7 +130,7 @@ public class Nio2Session extends AbstractCloseable 
implements IoSession {
     }
 
     @Override
-    public IoWriteFuture write(Buffer buffer) {
+    public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("Writing {} bytes", buffer.available());
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java 
b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 35bb96d..6e00e26 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -28,12 +28,14 @@ import org.apache.sshd.common.PropertyResolver;
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.auth.MutableUserHolder;
 import org.apache.sshd.common.channel.ChannelListenerManager;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
 import org.apache.sshd.common.cipher.CipherInformation;
 import org.apache.sshd.common.compression.CompressionInformation;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.future.KeyExchangeFuture;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.PacketWriter;
 import org.apache.sshd.common.kex.KexFactoryManager;
 import org.apache.sshd.common.kex.KexProposalOption;
 import org.apache.sshd.common.kex.KeyExchange;
@@ -52,12 +54,14 @@ public interface Session
                 SessionListenerManager,
                 ReservedSessionMessagesManager,
                 ChannelListenerManager,
+                ChannelStreamPacketWriterResolverManager,
                 PortForwardingEventListenerManager,
                 FactoryManagerHolder,
                 PropertyResolver,
                 AttributeStore,
                 Closeable,
-                MutableUserHolder {
+                MutableUserHolder,
+                PacketWriter {
 
     /**
      * Default prefix expected for the client / server identification string
@@ -199,17 +203,6 @@ public interface Session
     IoWriteFuture sendIgnoreMessage(byte... data) throws IOException;
 
     /**
-     * Encode and send the given buffer.
-     * The buffer has to have 5 bytes free at the beginning to allow the 
encoding to take place.
-     * Also, the write position of the buffer has to be set to the position of 
the last byte to write.
-     *
-     * @param buffer the buffer to encode and send
-     * @return An {@code IoWriteFuture} that can be used to check when the 
packet has actually been sent
-     * @throws IOException if an error occurred when encoding sending the 
packet
-     */
-    IoWriteFuture writePacket(Buffer buffer) throws IOException;
-
-    /**
      * Encode and send the given buffer with the specified timeout.
      * If the buffer could not be written before the timeout elapses, the 
returned
      * {@link org.apache.sshd.common.io.IoWriteFuture} will be set with a

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index ac2b272..5d01de4 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -55,6 +55,8 @@ import org.apache.sshd.common.Service;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.ChannelListener;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
 import org.apache.sshd.common.cipher.Cipher;
 import org.apache.sshd.common.cipher.CipherInformation;
 import org.apache.sshd.common.compression.Compression;
@@ -240,6 +242,7 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
      */
     private final Map<AttributeKey<?>, Object> attributes = new 
ConcurrentHashMap<>();
     private ReservedSessionMessagesHandler reservedSessionMessagesHandler;
+    private ChannelStreamPacketWriterResolver 
channelStreamPacketWriterResolver;
 
     /**
      * Create a new session.
@@ -433,6 +436,27 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
         signalSessionEvent(SessionListener.Event.Authenticated);
     }
 
+    @Override
+    public ChannelStreamPacketWriterResolver 
getChannelStreamPacketWriterResolver() {
+        return channelStreamPacketWriterResolver;
+    }
+
+    @Override
+    public void 
setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver 
resolver) {
+        channelStreamPacketWriterResolver = resolver;
+    }
+
+    @Override
+    public ChannelStreamPacketWriterResolver 
resolveChannelStreamPacketWriterResolver() {
+        ChannelStreamPacketWriterResolver resolver = 
getChannelStreamPacketWriterResolver();
+        if (resolver != null) {
+            return resolver;
+        }
+
+        ChannelStreamPacketWriterResolverManager manager = getFactoryManager();
+        return manager.resolveChannelStreamPacketWriterResolver();
+    }
+
     /**
      * <P>Main input point for the MINA framework.</P>
      *
@@ -1063,11 +1087,11 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
         synchronized (encodeLock) {
             if (ignoreBuf != null) {
                 ignoreBuf = encode(ignoreBuf);
-                ioSession.write(ignoreBuf);
+                ioSession.writePacket(ignoreBuf);
             }
 
             buffer = encode(buffer);
-            future = ioSession.write(buffer);
+            future = ioSession.writePacket(buffer);
         }
 
         return future;
@@ -1447,13 +1471,14 @@ public abstract class AbstractSession extends 
AbstractKexFactoryManager implemen
      * @param ident our identification to send
      * @return {@link IoWriteFuture} that can be used to wait for notification
      * that identification has been send
+     * @throws IOException If failed to send the packet
      */
-    protected IoWriteFuture sendIdentification(String ident) {
+    protected IoWriteFuture sendIdentification(String ident) throws 
IOException {
         byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8);
         if (log.isDebugEnabled()) {
             log.debug("sendIdentification({}): {}", this, ident.replace('\r', 
'|').replace('\n', '|'));
         }
-        return ioSession.write(new ByteArrayBuffer(data));
+        return ioSession.writePacket(new ByteArrayBuffer(data));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 010fa11..cca2eb9 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -286,7 +286,7 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
         ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds 
int boundaries: %d", len);
         // Make sure we copy the data as the incoming buffer may be reused
         Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len);
-        ioSession.write(buf).addListener(future -> {
+        ioSession.writePacket(buf).addListener(future -> {
             if (future.isWritten()) {
                 handleWriteDataSuccess(SshConstants.SSH_MSG_CHANNEL_DATA, 
buf.array(), 0, (int) len);
             } else {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
index 1385459..1254a14 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
@@ -167,9 +167,10 @@ public abstract class AbstractServerSession extends 
AbstractSession implements S
      * identification string - ignored if {@code null}/empty
      * @return An {@link IoWriteFuture} that can be used to be notified of
      * identification data being written successfully or failing
+     * @throws IOException If failed to send identification
      * @see <A HREF="https://tools.ietf.org/html/rfc4253#section-4.2";>RFC 4253 
- section 4.2</A>
      */
-    protected IoWriteFuture sendServerIdentification(String... headerLines) {
+    protected IoWriteFuture sendServerIdentification(String... headerLines) 
throws IOException {
         serverVersion = 
resolveIdentificationString(ServerFactoryManager.SERVER_IDENTIFICATION);
 
         String ident = serverVersion;
@@ -362,7 +363,7 @@ public abstract class AbstractServerSession extends 
AbstractSession implements S
         }
 
         if (GenericUtils.length(errorMessage) > 0) {
-            ioSession.write(new ByteArrayBuffer((errorMessage + 
"\n").getBytes(StandardCharsets.UTF_8)))
+            ioSession.writePacket(new ByteArrayBuffer((errorMessage + 
"\n").getBytes(StandardCharsets.UTF_8)))
                      .addListener(future -> close(true));
             throw new SshException(errorMessage);
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java 
b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
index 3d95af7..6f99fed 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
@@ -95,7 +95,7 @@ public class ChannelForwardedX11 extends 
AbstractClientChannel {
         Window wLocal = getLocalWindow();
         wLocal.consumeAndCheck(len);
         // use a clone in case data buffer is re-used
-        serverSession.write(ByteArrayBuffer.getCompactClone(data, off, (int) 
len));
+        serverSession.writePacket(ByteArrayBuffer.getCompactClone(data, off, 
(int) len));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java 
b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
index 1d89ee8..cac8e32 100644
--- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
@@ -231,10 +231,23 @@ public class WindowAdjustTest extends BaseTestSupport {
             futureHolder.set(service.submit((Runnable) () -> {
                 log.info("Start heavy load sending " + sendCount + " messages 
of " + msg.length + " bytes");
                 for (int i = 0; i < sendCount; i++) {
-                    pendingWrapper.write(new ByteArrayBuffer(msg));
+                    try {
+                        pendingWrapper.write(new ByteArrayBuffer(msg));
+                    } catch (IOException e) {
+                        log.error("Failed ({}) to send message #{}/{}: {}",
+                                e.getClass().getSimpleName(), i + 1, 
sendCount, e.getMessage());
+                        throw new RuntimeException(e);
+                    }
                 }
                 log.info("Sending EOF signal");
-                pendingWrapper.write(new ByteArrayBuffer(new 
byte[]{eofSignal}));
+
+                try {
+                    pendingWrapper.write(new ByteArrayBuffer(new 
byte[]{eofSignal}));
+                } catch (IOException e) {
+                    log.error("Failed ({}) to send EOF message after {} 
messages: {}",
+                            e.getClass().getSimpleName(), sendCount, 
e.getMessage());
+                    throw new RuntimeException(e);
+                }
             }));
             log.info("Started");
         }
@@ -270,9 +283,9 @@ public class WindowAdjustTest extends BaseTestSupport {
             this.asyncIn = out;
         }
 
-        public synchronized void write(final Object msg) {
+        public synchronized void write(Object msg) throws IOException {
             if ((asyncIn != null) && (!asyncIn.isClosed()) && 
(!asyncIn.isClosing())) {
-                final Buffer byteBufferMsg = (Buffer) msg;
+                Buffer byteBufferMsg = (Buffer) msg;
                 if (!pending.isEmpty()) {
                     queueRequest(byteBufferMsg);
                     return;
@@ -282,14 +295,19 @@ public class WindowAdjustTest extends BaseTestSupport {
             }
         }
 
-        private void writeWithPendingDetection(final Buffer msg, final boolean 
wasPending) {
+        private void writeWithPendingDetection(Buffer msg, boolean wasPending) 
throws IOException {
             try {
-                asyncIn.write(msg).addListener(future -> {
+                asyncIn.writePacket(msg).addListener(future -> {
                     if (future.isWritten()) {
                         if (wasPending) {
                             pending.remove();
                         }
-                        writePendingIfAny();
+
+                        try {
+                            writePendingIfAny();
+                        } catch (IOException e) {
+                            log.error("Failed ({}) to re-write pending: {}", 
e.getClass().getSimpleName(), e.getMessage());
+                        }
                     } else {
                         Throwable t = future.getException();
                         log.warn("Failed to write message", t);
@@ -302,16 +320,16 @@ public class WindowAdjustTest extends BaseTestSupport {
             }
         }
 
-        private synchronized void writePendingIfAny() {
+        private synchronized void writePendingIfAny() throws IOException {
             if (pending.peek() == null) {
                 return;
             }
 
-            final Buffer msg = pending.peek();
+            Buffer msg = pending.peek();
             writeWithPendingDetection(msg, true);
         }
 
-        private void queueRequest(final Buffer msg) {
+        private void queueRequest(Buffer msg) {
             msg.rpos(0);
             pending.add(msg);
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java 
b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index 4968282..cbfa56f 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -77,6 +77,8 @@ import org.apache.sshd.common.channel.TestChannelListener;
 import org.apache.sshd.common.config.keys.KeyUtils;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoReadFuture;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
@@ -509,15 +511,16 @@ public class ClientTest extends BaseTestSupport {
                  ByteArrayOutputStream baosErr = new ByteArrayOutputStream()) {
                 AtomicInteger writes = new AtomicInteger(nbMessages);
 
-                channel.getAsyncIn().write(new 
ByteArrayBuffer(message)).addListener(new SshFutureListener<IoWriteFuture>() {
+                IoOutputStream asyncIn = channel.getAsyncIn();
+                asyncIn.writePacket(new 
ByteArrayBuffer(message)).addListener(new SshFutureListener<IoWriteFuture>() {
                     @Override
                     public void operationComplete(IoWriteFuture future) {
                         try {
                             if (future.isWritten()) {
                                 if (writes.decrementAndGet() > 0) {
-                                    channel.getAsyncIn().write(new 
ByteArrayBuffer(message)).addListener(this);
+                                    asyncIn.writePacket(new 
ByteArrayBuffer(message)).addListener(this);
                                 } else {
-                                    channel.getAsyncIn().close(false);
+                                    asyncIn.close(false);
                                 }
                             } else {
                                 throw new SshException("Error writing", 
future.getException());
@@ -529,7 +532,9 @@ public class ClientTest extends BaseTestSupport {
                         }
                     }
                 });
-                channel.getAsyncOut().read(new 
ByteArrayBuffer()).addListener(new SshFutureListener<IoReadFuture>() {
+
+                IoInputStream asyncOut = channel.getAsyncOut();
+                asyncOut.read(new ByteArrayBuffer()).addListener(new 
SshFutureListener<IoReadFuture>() {
                     @Override
                     public void operationComplete(IoReadFuture future) {
                         try {
@@ -539,7 +544,7 @@ public class ClientTest extends BaseTestSupport {
                             baosOut.write(buffer.array(), buffer.rpos(), 
buffer.available());
                             buffer.rpos(buffer.rpos() + buffer.available());
                             buffer.compact();
-                            
channel.getAsyncOut().read(buffer).addListener(this);
+                            asyncOut.read(buffer).addListener(this);
                         } catch (IOException e) {
                             if (!channel.isClosing()) {
                                 channel.close(true);
@@ -547,7 +552,9 @@ public class ClientTest extends BaseTestSupport {
                         }
                     }
                 });
-                channel.getAsyncErr().read(new 
ByteArrayBuffer()).addListener(new SshFutureListener<IoReadFuture>() {
+
+                IoInputStream asyncErr = channel.getAsyncErr();
+                asyncErr.read(new ByteArrayBuffer()).addListener(new 
SshFutureListener<IoReadFuture>() {
                     @Override
                     public void operationComplete(IoReadFuture future) {
                         try {
@@ -557,7 +564,7 @@ public class ClientTest extends BaseTestSupport {
                             baosErr.write(buffer.array(), buffer.rpos(), 
buffer.available());
                             buffer.rpos(buffer.rpos() + buffer.available());
                             buffer.compact();
-                            
channel.getAsyncErr().read(buffer).addListener(this);
+                            asyncErr.read(buffer).addListener(this);
                         } catch (IOException e) {
                             if (!channel.isClosing()) {
                                 channel.close(true);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java 
b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
index c06f3c3..93c37be 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
@@ -275,7 +275,7 @@ public class WindowTest extends BaseTestSupport {
                     IoInputStream input = channel.getAsyncOut();
                     for (int i = 0; i < nbMessages; i++) {
                         Buffer buffer = new ByteArrayBuffer(bytes);
-                        output.write(buffer).verify(5L, TimeUnit.SECONDS);
+                        output.writePacket(buffer).verify(5L, 
TimeUnit.SECONDS);
 
                         waitForWindowNotEquals(clientLocal, serverRemote, 
"client local", "server remote", TimeUnit.SECONDS.toMillis(3L));
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
 
b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
index bb6b4cd..538f2cd 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.sshd.common.session.helpers;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.WriteAbortedException;
 import java.net.SocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -359,12 +361,12 @@ public class AbstractSessionTest extends BaseTestSupport {
         }
 
         @Override
-        public IoWriteFuture write(Buffer buffer) {
+        public IoWriteFuture writePacket(Buffer buffer) throws IOException {
             if (!isOpen()) {
-                throw new IllegalStateException("Not open");
+                throw new EOFException("Not open");
             }
             if (!outgoing.offer(buffer)) {
-                throw new IllegalStateException("Failed to offer outgoing 
buffer");
+                throw new WriteAbortedException("Failed to offer outgoing 
buffer", new IllegalStateException("Offer failure"));
             }
 
             IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java 
b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
index ae59e12..509b6b7 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
@@ -130,7 +130,7 @@ public class ServerProxyAcceptorTest extends 
BaseTestSupport {
 
         client.setClientProxyConnector(session -> {
             IoSession ioSession = session.getIoSession();
-            ioSession.write(new ByteArrayBuffer(metaDataBytes));
+            ioSession.writePacket(new ByteArrayBuffer(metaDataBytes));
         });
         client.start();
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
 
b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
index 7a077a4..49affe7 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
@@ -45,7 +45,7 @@ public class ChannelSessionTest extends BaseTestSupport {
      */
     @Test
     public void testHandleWindowAdjust() throws Exception {
-        final Buffer buffer = new ByteArrayBuffer();
+        Buffer buffer = new ByteArrayBuffer();
         buffer.putInt(1234);
 
         try (ChannelSession channelSession = new ChannelSession() {
@@ -54,7 +54,7 @@ public class ChannelSessionTest extends BaseTestSupport {
                     
wRemote.init(PropertyResolverUtils.toPropertyResolver(Collections.emptyMap()));
                 }
         }) {
-            final AtomicBoolean expanded = new AtomicBoolean(false);
+            AtomicBoolean expanded = new AtomicBoolean(false);
             channelSession.asyncOut = new ChannelAsyncOutputStream(new 
BogusChannel(), (byte) 0) {
                 @Override
                 public void onWindowExpanded() throws IOException {
@@ -69,7 +69,7 @@ public class ChannelSessionTest extends BaseTestSupport {
 
     @Test   // see SSHD-652
     public void testCloseFutureListenerRegistration() throws Exception {
-        final AtomicInteger closeCount = new AtomicInteger();
+        AtomicInteger closeCount = new AtomicInteger();
         try (ChannelSession session = new ChannelSession() {
             {
                 Window wRemote = getRemoteWindow();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java 
b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
index cef9579..b0ff4bb 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
@@ -140,7 +140,7 @@ public class AsyncEchoShellFactory implements 
Factory<Command> {
                 if (buffer.charAt(i) == '\n') {
                     String s = buffer.substring(0, i + 1);
                     byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
-                    out.write(new ByteArrayBuffer(bytes)).addListener(future 
-> {
+                    out.writePacket(new 
ByteArrayBuffer(bytes)).addListener(future -> {
                         Session session1 = channel.getSession();
                         if (future.isWritten()) {
                             try {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e0a6b8da/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java 
b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
index 0ef7bed..96bb36d 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
@@ -23,6 +23,9 @@ import java.io.IOException;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.channel.AbstractChannel;
+import org.apache.sshd.common.channel.Channel;
+import 
org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
+import org.apache.sshd.common.io.PacketWriter;
 import org.apache.sshd.common.util.buffer.Buffer;
 
 public class BogusChannel extends AbstractChannel {
@@ -59,4 +62,14 @@ public class BogusChannel extends AbstractChannel {
     public void handleOpenFailure(Buffer buffer) throws IOException {
         // ignored
     }
+
+    @Override
+    public ChannelStreamPacketWriterResolver 
getChannelStreamPacketWriterResolver() {
+        return ChannelStreamPacketWriterResolver.NONE;
+    }
+
+    @Override
+    public PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte 
cmd) {
+        return channel;
+    }
 }

Reply via email to