This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push: new 8e59075 [SSHD-1080] Rework the PacketWriter to split according to the various semantics 8e59075 is described below commit 8e59075bc55c497e4d4056f457e4d657d3c61c1a Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Tue Sep 22 11:38:55 2020 +0200 [SSHD-1080] Rework the PacketWriter to split according to the various semantics --- CHANGES.md | 1 + .../org/apache/sshd/common/io/IoInputStream.java | 5 ++ .../org/apache/sshd/common/io/IoOutputStream.java | 22 +++++- .../java/org/apache/sshd/common/io/IoSession.java | 30 ++++++-- .../org/apache/sshd/common/io/PacketWriter.java | 81 ---------------------- ...ter.java => ThrottlingChannelStreamWriter.java} | 35 +++++----- ...java => ThrottlingChannelStreamWriterTest.java} | 37 +++++----- .../java/org/apache/sshd/common/BaseBuilder.java | 8 +-- .../org/apache/sshd/common/FactoryManager.java | 4 +- .../sshd/common/channel/AbstractChannel.java | 18 ++--- .../common/channel/BufferedIoOutputStream.java | 6 +- .../org/apache/sshd/common/channel/Channel.java | 19 +++-- .../common/channel/ChannelAsyncOutputStream.java | 12 ++-- .../sshd/common/channel/ChannelOutputStream.java | 8 +-- .../sshd/common/channel/SimpleIoOutputStream.java | 5 +- .../sshd/common/channel/StreamingChannel.java | 2 +- .../channel/throttle/ChannelStreamWriter.java | 48 +++++++++++++ ...olver.java => ChannelStreamWriterResolver.java} | 12 ++-- ...ava => ChannelStreamWriterResolverManager.java} | 19 +++-- ...anager.java => DefaultChannelStreamWriter.java} | 36 +++++++--- .../org/apache/sshd/common/forward/SocksProxy.java | 8 +-- .../sshd/common/forward/TcpipClientChannel.java | 2 +- .../common/helpers/AbstractFactoryManager.java | 12 ++-- .../apache/sshd/common/io/nio2/Nio2Session.java | 4 +- .../org/apache/sshd/common/session/Session.java | 34 +++++---- .../common/session/helpers/AbstractSession.java | 49 +++++++++++-- .../sshd/common/session/helpers/SessionHelper.java | 20 +++--- .../sshd/server/forward/TcpipServerChannel.java | 34 ++++----- .../sshd/server/session/AbstractServerSession.java | 4 +- .../sshd/server/x11/ChannelForwardedX11.java | 2 +- .../src/test/java/org/apache/sshd/LoadTest.java | 4 -- .../java/org/apache/sshd/WindowAdjustTest.java | 2 +- .../java/org/apache/sshd/client/ClientTest.java | 9 +-- .../org/apache/sshd/common/channel/WindowTest.java | 5 +- .../common/forward/PortForwardingLoadTest.java | 5 +- .../session/helpers/AbstractSessionTest.java | 2 +- .../sshd/server/ServerProxyAcceptorTest.java | 2 +- .../sshd/util/test/AsyncEchoShellFactory.java | 2 +- .../org/apache/sshd/util/test/BogusChannel.java | 14 ++-- .../java/org/apache/sshd/mina/MinaSession.java | 2 +- .../java/org/apache/sshd/netty/NettyIoSession.java | 2 +- .../sshd/sftp/client/impl/DefaultSftpClient.java | 4 +- .../org/apache/sshd/sftp/server/SftpSubsystem.java | 2 +- 43 files changed, 359 insertions(+), 273 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index fc2776d..23f04ef 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -17,6 +17,7 @@ or `-key-file` command line option. * [SSHD-1034](https://issues.apache.org/jira/browse/SSHD-1034) Rename `org.apache.sshd.common.ForwardingFilter` to `Forwarder`. * [SSHD-1035](https://issues.apache.org/jira/browse/SSHD-1035) Move property definitions to common locations. * [SSHD-1038](https://issues.apache.org/jira/browse/SSHD-1038) Refactor packages from a module into a cleaner hierarchy. +* [SSHD-1080](https://issues.apache.org/jira/browse/SSHD-1080) Rework the PacketWriter to split according to the various semantics * [SSHD-1084](https://issues.apache.org/jira/browse/SSHD-1084) Revert the usage of asynchronous streams when forwarding ports. ## Minor code helpers diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java b/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java index faa509c..8fb86e3 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java @@ -21,6 +21,11 @@ package org.apache.sshd.common.io; import org.apache.sshd.common.Closeable; import org.apache.sshd.common.util.buffer.Buffer; +/** + * Represents a stream that can be read asynchronously. + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ public interface IoInputStream extends Closeable { /** * NOTE: the buffer must not be touched until the returned read future is completed. diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java b/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java index e98e5f0..64b8876 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java @@ -18,8 +18,26 @@ */ package org.apache.sshd.common.io; +import java.io.IOException; + import org.apache.sshd.common.Closeable; +import org.apache.sshd.common.util.buffer.Buffer; + +/** + * Represents a stream that can be written asynchronously. + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public interface IoOutputStream extends Closeable { + + /** + * Write the given buffer. + * + * @param buffer the data to write. <B>NOTE:</B> the buffer must not be touched until the returned write + * future is completed. + * @return An {@code IoWriteFuture} that can be used to check when the data has actually been written. + * @throws IOException if an error occurred when writing the data + */ + IoWriteFuture writeBuffer(Buffer buffer) throws IOException; -public interface IoOutputStream extends Closeable, PacketWriter { - // nothing extra } diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java index f8de2b4..76b0022 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.SocketAddress; import org.apache.sshd.common.Closeable; +import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.net.ConnectionEndpointsIndicator; -public interface IoSession extends ConnectionEndpointsIndicator, PacketWriter, Closeable { +public interface IoSession extends ConnectionEndpointsIndicator, Closeable { /** * @return a unique identifier for this session. Every session has its own ID which is different from any other. @@ -83,6 +85,27 @@ public interface IoSession extends ConnectionEndpointsIndicator, PacketWriter, C Object removeAttribute(Object key); /** + * Write a packet on the socket. Multiple writes can be issued concurrently and will be queued. + * + * @param buffer the buffer send. <B>NOTE:</B> the buffer must not be touched until the returned write future + * is completed. + * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent + * @throws IOException if an error occurred when sending the packet + */ + IoWriteFuture writeBuffer(Buffer buffer) throws IOException; + + /** + * Closes this session immediately or after all queued write requests are flushed. This operation is asynchronous. + * Wait for the returned {@link CloseFuture} if you want to wait for the session actually closed. + * + * @param immediately {@code true} to close this session immediately. The pending write requests will simply be + * discarded. {@code false} to close this session after all queued write requests are flushed. + * @return The generated {@link CloseFuture} + */ + @Override + CloseFuture close(boolean immediately); + + /** * @return the {@link IoService} that created this session. */ IoService getService(); @@ -97,9 +120,8 @@ public interface IoSession extends ConnectionEndpointsIndicator, PacketWriter, C /** * Suspend read operations on this session. May do nothing if not supported by the session implementation. * - * If the session usage includes a graceful shutdown with messages being exchanged, the caller needs to - * take care of resuming reading the input in order to actually be able to carry on the conversation with - * the peer. + * If the session usage includes a graceful shutdown with messages being exchanged, the caller needs to take care of + * resuming reading the input in order to actually be able to carry on the conversation with the peer. */ default void suspendRead() { // Do nothing by default, but can be overriden by implementations diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java b/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java deleted file mode 100644 index 0862728..0000000 --- a/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sshd.common.io; - -import java.io.IOException; -import java.nio.channels.Channel; - -import org.apache.sshd.common.util.buffer.Buffer; - -/** - * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> - */ -public interface PacketWriter extends Channel { - /** - * Encode and send the given buffer. <B>Note:</B> for session packets the buffer has to have 5 bytes free at the - * beginning to allow the encoding to take place. Also, the write position of the buffer has to be set to the - * position of the last byte to write. - * - * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer must not be touched until the returned - * write future is completed. - * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent - * @throws IOException if an error occurred when encoding sending the packet - */ - IoWriteFuture writePacket(Buffer buffer) throws IOException; - - /** - * @param len The packet payload size - * @param blockSize The cipher block size - * @param etmMode Whether using "encrypt-then-MAC" mode - * @return The required padding length - */ - static int calculatePadLength(int len, int blockSize, boolean etmMode) { - /* - * Note: according to RFC-4253 section 6: - * - * The minimum size of a packet is 16 (or the cipher block size, whichever is larger) bytes (plus 'mac'). - * - * Since all out ciphers, MAC(s), etc. have a block size > 8 then the minimum size of the packet will be at - * least 16 due to the padding at the very least - so even packets that contain an opcode with no arguments will - * be above this value. This avoids an un-necessary call to Math.max(len, 16) for each and every packet - */ - - len++; // the pad length - if (!etmMode) { - len += Integer.BYTES; - } - - /* - * Note: according to RFC-4253 section 6: - * - * Note that the length of the concatenation of 'packet_length', 'padding_length', 'payload', and 'random - * padding' MUST be a multiple of the cipher block size or 8, whichever is larger. - * - * However, we currently do not have ciphers with a block size of less than 8 so we do not take this into - * account in order to accelerate the calculation and avoiding an un-necessary call to Math.max(blockSize, 8) - * for each and every packet. - */ - int pad = (-len) & (blockSize - 1); - if (pad < blockSize) { - pad += blockSize; - } - - return pad; - } -} diff --git a/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java similarity index 83% rename from sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java rename to sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java index a62e25c..5092b5e 100644 --- a/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java +++ b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java @@ -34,22 +34,22 @@ import org.apache.sshd.common.PropertyResolver; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.logging.AbstractLoggingBean; /** - * A {@link PacketWriter} delegate implementation that "throttles" the output by having a limit on the - * outstanding packets that have not been sent yet. The {@link #writePacket(Buffer) writePacket} implementation make - * sure that the limit has not been exceeded - if so, then it waits until pending packets have been successfully sent - * before sending the next packet. + * A {@link ChannelStreamWriter} delegate implementation that "throttles" the output by having a limit on the + * outstanding packets that have not been sent yet. The {@link #writeData(Buffer) writePacket} implementation make sure + * that the limit has not been exceeded - if so, then it waits until pending packets have been successfully sent before + * sending the next packet. * * <B>Note:</B> {@link #close() closing} the throttler does not close the delegate writer * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public class ThrottlingPacketWriter extends AbstractLoggingBean implements PacketWriter, SshFutureListener<IoWriteFuture> { +public class ThrottlingChannelStreamWriter extends AbstractLoggingBean + implements ChannelStreamWriter, SshFutureListener<IoWriteFuture> { /** Timeout (seconds) for throttling packet writer to wait for pending packets send */ public static final Property<Duration> WAIT_TIME = Property.durationSec("packet-writer-wait-time", Duration.ofSeconds(30L)); @@ -58,29 +58,30 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean implements Packe public static final Property<Integer> MAX_PEND_COUNT = Property.integer("packet-writer-max-pend-count", 4096); - private final PacketWriter delegate; + private final ChannelStreamWriter delegate; private final int maxPendingPackets; private final long maxWait; private final AtomicBoolean open = new AtomicBoolean(true); private final AtomicInteger availableCount; - public ThrottlingPacketWriter(Channel channel) { - this(channel, channel); + public ThrottlingChannelStreamWriter(Channel channel) { + this(new DefaultChannelStreamWriter(channel), channel); } - public ThrottlingPacketWriter(PacketWriter delegate, PropertyResolver resolver) { + public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, PropertyResolver resolver) { this(delegate, MAX_PEND_COUNT.getRequired(resolver), WAIT_TIME.getRequired(resolver)); } - public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, TimeUnit waitUnit, long waitCount) { + public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, int maxPendingPackets, TimeUnit waitUnit, + long waitCount) { this(delegate, maxPendingPackets, waitUnit.toMillis(waitCount)); } - public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, Duration maxWait) { + public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, int maxPendingPackets, Duration maxWait) { this(delegate, maxPendingPackets, maxWait.toMillis()); } - public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, long maxWait) { + public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, int maxPendingPackets, long maxWait) { this.delegate = Objects.requireNonNull(delegate, "No delegate provided"); ValidateUtils.checkTrue(maxPendingPackets > 0, "Invalid pending packets limit: %d", maxPendingPackets); this.maxPendingPackets = maxPendingPackets; @@ -89,7 +90,7 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean implements Packe this.maxWait = maxWait; } - public PacketWriter getDelegate() { + public ChannelStreamWriter getDelegate() { return delegate; } @@ -111,7 +112,7 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean implements Packe } @Override - public IoWriteFuture writePacket(Buffer buffer) throws IOException { + public IoWriteFuture writeData(Buffer buffer) throws IOException { if (!isOpen()) { throw new ClosedSelectorException(); } @@ -147,8 +148,8 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean implements Packe throw new EOFException("Negative available packets count: " + available); } - PacketWriter writer = getDelegate(); - return writer.writePacket(buffer).addListener(this); + ChannelStreamWriter writer = getDelegate(); + return writer.writeData(buffer).addListener(this); } @Override diff --git a/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java similarity index 76% rename from sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java rename to sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java index 7bed6dc..14eb5ca 100644 --- a/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java +++ b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import org.apache.sshd.common.channel.IoWriteFutureImpl; import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.util.test.BaseTestSupport; @@ -45,27 +44,27 @@ import org.junit.runners.MethodSorters; */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) @Category({ NoIoTestCase.class }) -public class ThrottlingPacketWriterTest extends BaseTestSupport { - public ThrottlingPacketWriterTest() { +public class ThrottlingChannelStreamWriterTest extends BaseTestSupport { + public ThrottlingChannelStreamWriterTest() { super(); } @Test(timeout = 10_000) public void testThrottlerWaitsUntilPacketSendSignalled() throws IOException { - try (ThrottlingPacketWriter throttler - = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { + try (ThrottlingChannelStreamWriter throttler + = new ThrottlingChannelStreamWriter(new MockChannelStreamWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { int maxSize = throttler.getMaxPendingPackets(); List<IoWriteFuture> pendingWrites = new ArrayList<>(maxSize); Buffer buf = new ByteArrayBuffer(Byte.SIZE); for (int index = maxSize; index > 0; index--) { - IoWriteFuture future = throttler.writePacket(buf); + IoWriteFuture future = throttler.writeData(buf); pendingWrites.add(future); assertEquals("Mismatched available packets count", index - 1, throttler.getAvailablePacketsCount()); } assertEquals("Not all available packet window size exhausted", 0, throttler.getAvailablePacketsCount()); try { - IoWriteFuture future = throttler.writePacket(buf); + IoWriteFuture future = throttler.writeData(buf); fail("Unexpected extra packet success: " + future); } catch (InterruptedByTimeoutException e) { // expected @@ -79,41 +78,41 @@ public class ThrottlingPacketWriterTest extends BaseTestSupport { } for (int index = throttler.getAvailablePacketsCount(); index < maxSize; index++) { - throttler.writePacket(buf); + throttler.writeData(buf); } } } @Test(expected = ClosedSelectorException.class, timeout = 10_000) public void testThrottlerDoesNotSendIfClosed() throws IOException { - try (PacketWriter throttler - = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { + try (ChannelStreamWriter throttler + = new ThrottlingChannelStreamWriter(new MockChannelStreamWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { assertTrue("Throttler not marked as open", throttler.isOpen()); throttler.close(); assertFalse("Throttler not marked as closed", throttler.isOpen()); - IoWriteFuture future = throttler.writePacket(new ByteArrayBuffer(Byte.SIZE)); + IoWriteFuture future = throttler.writeData(new ByteArrayBuffer(Byte.SIZE)); fail("Unexpected success: " + future); } } @Test(expected = ClosedSelectorException.class, timeout = 10_000) public void testThrottlerStopsSendingIfExceptionSignaledOnFutureOperationCompletion() throws IOException { - try (PacketWriter throttler - = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { + try (ChannelStreamWriter throttler + = new ThrottlingChannelStreamWriter(new MockChannelStreamWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { assertTrue("Throttler not marked as open", throttler.isOpen()); - IoWriteFutureImpl futureImpl = (IoWriteFutureImpl) throttler.writePacket(new ByteArrayBuffer(Byte.SIZE)); + IoWriteFutureImpl futureImpl = (IoWriteFutureImpl) throttler.writeData(new ByteArrayBuffer(Byte.SIZE)); futureImpl.setValue(new StreamCorruptedException(getCurrentTestName())); assertFalse("Throttler not marked as closed", throttler.isOpen()); - IoWriteFuture future = throttler.writePacket(new ByteArrayBuffer(Byte.SIZE)); + IoWriteFuture future = throttler.writeData(new ByteArrayBuffer(Byte.SIZE)); fail("Unexpected success: " + future); } } - private static class MockPacketWriter implements PacketWriter { - MockPacketWriter() { + private static class MockChannelStreamWriter implements ChannelStreamWriter { + MockChannelStreamWriter() { super(); } @@ -128,8 +127,8 @@ public class ThrottlingPacketWriterTest extends BaseTestSupport { } @Override - public IoWriteFuture writePacket(Buffer buffer) throws IOException { - return new IoWriteFutureImpl(MockPacketWriter.class.getSimpleName(), buffer); + public IoWriteFuture writeData(Buffer buffer) throws IOException { + return new IoWriteFutureImpl(MockChannelStreamWriter.class.getSimpleName(), buffer); } } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java index 6b7aa97..6c24a5c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.sshd.common.channel.ChannelFactory; import org.apache.sshd.common.channel.RequestHandler; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver; import org.apache.sshd.common.cipher.BuiltinCiphers; import org.apache.sshd.common.cipher.Cipher; import org.apache.sshd.common.compression.Compression; @@ -148,7 +148,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder protected ForwarderFactory forwarderFactory; protected List<RequestHandler<ConnectionService>> globalRequestHandlers; protected ForwardingFilter forwardingFilter; - protected ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver; + protected ChannelStreamWriterResolver channelStreamPacketWriterResolver; protected UnknownChannelReferenceHandler unknownChannelReferenceHandler; public BaseBuilder() { @@ -247,7 +247,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder return me(); } - public S channelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) { + public S channelStreamPacketWriterResolver(ChannelStreamWriterResolver resolver) { channelStreamPacketWriterResolver = resolver; return me(); } @@ -275,7 +275,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder ssh.setForwardingFilter(forwardingFilter); ssh.setForwarderFactory(forwarderFactory); ssh.setGlobalRequestHandlers(globalRequestHandlers); - ssh.setChannelStreamPacketWriterResolver(channelStreamPacketWriterResolver); + ssh.setChannelStreamWriterResolver(channelStreamPacketWriterResolver); ssh.setUnknownChannelReferenceHandler(unknownChannelReferenceHandler); return ssh; } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java index 47340c9..52df0a9 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java @@ -26,7 +26,7 @@ import org.apache.sshd.agent.SshAgentFactory; import org.apache.sshd.common.channel.ChannelFactory; import org.apache.sshd.common.channel.ChannelListenerManager; import org.apache.sshd.common.channel.RequestHandler; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager; import org.apache.sshd.common.file.FileSystemFactory; import org.apache.sshd.common.forward.ForwarderFactory; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; @@ -56,7 +56,7 @@ public interface FactoryManager ReservedSessionMessagesManager, SessionDisconnectHandlerManager, ChannelListenerManager, - ChannelStreamPacketWriterResolverManager, + ChannelStreamWriterResolverManager, UnknownChannelReferenceHandlerManager, PortForwardingEventListenerManager, IoServiceEventListenerManager, diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index 1ce2566..8bb4d99 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -41,8 +41,8 @@ import org.apache.sshd.common.Closeable; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.PropertyResolver; import org.apache.sshd.common.SshConstants; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.DefaultCloseFuture; import org.apache.sshd.common.future.SshFutureListener; @@ -105,7 +105,7 @@ public abstract class AbstractChannel private final Window localWindow; private final Window remoteWindow; - private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver; + private ChannelStreamWriterResolver channelStreamPacketWriterResolver; /** * A {@link Map} of sent requests - key = request name, value = timestamp when request was sent. @@ -196,24 +196,24 @@ public abstract class AbstractChannel } @Override - public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { + public ChannelStreamWriterResolver getChannelStreamWriterResolver() { return channelStreamPacketWriterResolver; } @Override - public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) { + public void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) { channelStreamPacketWriterResolver = resolver; } @Override - public ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() { - ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver(); + public ChannelStreamWriterResolver resolveChannelStreamWriterResolver() { + ChannelStreamWriterResolver resolver = getChannelStreamWriterResolver(); if (resolver != null) { return resolver; } - ChannelStreamPacketWriterResolverManager manager = getSession(); - return manager.resolveChannelStreamPacketWriterResolver(); + ChannelStreamWriterResolverManager manager = getSession(); + return manager.resolveChannelStreamWriterResolver(); } /** diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java index f1f64fd..3ee3ece 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java @@ -32,7 +32,7 @@ import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.closeable.AbstractInnerCloseable; /** - * An {@link IoOutputStream} capable of queuing write requests + * An {@link IoOutputStream} capable of queuing write requests. */ public class BufferedIoOutputStream extends AbstractInnerCloseable implements IoOutputStream { protected final IoOutputStream out; @@ -50,7 +50,7 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io } @Override - public IoWriteFuture writePacket(Buffer buffer) throws IOException { + public IoWriteFuture writeBuffer(Buffer buffer) throws IOException { if (isClosing()) { throw new EOFException("Closed - state=" + state); } @@ -71,7 +71,7 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io return; } - out.writePacket(future.getBuffer()).addListener( + out.writeBuffer(future.getBuffer()).addListener( new SshFutureListener<IoWriteFuture>() { @Override public void operationComplete(IoWriteFuture f) { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java index 4374e88..dae4aa1 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java @@ -28,8 +28,8 @@ import org.apache.sshd.common.AttributeRepository; import org.apache.sshd.common.AttributeStore; import org.apache.sshd.common.Closeable; import org.apache.sshd.common.PropertyResolver; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; -import org.apache.sshd.common.io.PacketWriter; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager; +import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.session.SessionHolder; @@ -47,8 +47,7 @@ public interface Channel ChannelListenerManager, PropertyResolver, AttributeStore, - PacketWriter, - ChannelStreamPacketWriterResolverManager, + ChannelStreamWriterResolverManager, Closeable { // Known types of channels String CHANNEL_EXEC = "exec"; @@ -224,4 +223,16 @@ public interface Channel T value = channel.getAttribute(key); return (value != null) ? value : Session.resolveAttribute(channel.getSession(), key); } + + /** + * Encode and send the given buffer. <B>Note:</B> for session packets the buffer has to have 5 bytes free at the + * beginning to allow the encoding to take place. Also, the write position of the buffer has to be set to the + * position of the last byte to write. + * + * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer must not be touched until the returned + * write future is completed. + * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent + * @throws IOException if an error occurred when encoding or sending the packet + */ + IoWriteFuture writePacket(Buffer buffer) throws IOException; } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java index 8b69e69..8d1701f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java @@ -24,10 +24,10 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriter; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.io.WritePendingException; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.buffer.Buffer; @@ -36,14 +36,14 @@ import org.apache.sshd.common.util.closeable.AbstractCloseable; public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder { private final Channel channelInstance; - private final PacketWriter packetWriter; + private final ChannelStreamWriter packetWriter; private final byte cmd; private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>(); private final Object packetWriteId; public ChannelAsyncOutputStream(Channel channel, byte cmd) { this.channelInstance = Objects.requireNonNull(channel, "No channel"); - this.packetWriter = channelInstance.resolveChannelStreamPacketWriter(channel, cmd); + this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd); this.cmd = cmd; this.packetWriteId = channel.toString() + "[" + SshConstants.getCommandMessageName(cmd) + "]"; } @@ -58,14 +58,14 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut } @Override - public synchronized IoWriteFuture writePacket(Buffer buffer) throws IOException { + public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws IOException { if (isClosing()) { throw new EOFException("Closing: " + state); } IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, buffer); if (!pendingWrite.compareAndSet(null, future)) { - throw new WritePendingException("No write pending future"); + throw new WritePendingException("A write operation is already pending"); } doWriteIfPossible(false); return future; @@ -164,7 +164,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut remoteWindow.consume(length); try { - IoWriteFuture writeFuture = packetWriter.writePacket(buf); + IoWriteFuture writeFuture = packetWriter.writeData(buf); writeFuture.addListener(f -> onWritten(future, total, length, f)); } catch (IOException e) { future.setValue(e); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java index b78ff72..42f4e28 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.exception.SshChannelClosedException; -import org.apache.sshd.common.io.PacketWriter; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriter; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; @@ -44,7 +44,7 @@ import org.slf4j.Logger; public class ChannelOutputStream extends OutputStream implements java.nio.channels.Channel, ChannelHolder { private final AbstractChannel channelInstance; - private final PacketWriter packetWriter; + private final ChannelStreamWriter packetWriter; private final Window remoteWindow; private final Duration maxWaitTimeout; private final Logger log; @@ -76,7 +76,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe AbstractChannel channel, Window remoteWindow, Duration maxWaitTimeout, Logger log, byte cmd, boolean eofOnClose) { this.channelInstance = Objects.requireNonNull(channel, "No channel"); - this.packetWriter = channelInstance.resolveChannelStreamPacketWriter(channel, cmd); + this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd); this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote window"); Objects.requireNonNull(maxWaitTimeout, "No maxWaitTimeout"); ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s", @@ -240,7 +240,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe log.trace("flush({}) send {} len={}", channel, SshConstants.getCommandMessageName(cmd), length); } - packetWriter.writePacket(buf); + packetWriter.writeData(buf); } } catch (WindowClosedException e) { if (!closedState.getAndSet(true)) { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java index 6fee66a..7bdba6b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java @@ -26,12 +26,11 @@ import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.closeable.AbstractCloseable; import org.apache.sshd.common.util.io.IoUtils; -import org.apache.sshd.server.forward.TcpipServerChannel; /** * An implementation of {@link IoOutputStream} using a synchronous {@link ChannelOutputStream}. * - * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public class SimpleIoOutputStream extends AbstractCloseable implements IoOutputStream { @@ -48,7 +47,7 @@ public class SimpleIoOutputStream extends AbstractCloseable implements IoOutputS } @Override - public IoWriteFuture writePacket(Buffer buffer) throws IOException { + public IoWriteFuture writeBuffer(Buffer buffer) throws IOException { os.write(buffer.array(), buffer.rpos(), buffer.available()); os.flush(); DefaultIoWriteFuture f = new DefaultIoWriteFuture(this, null); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java index e2d7b94..0b33ad5 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java @@ -21,7 +21,7 @@ package org.apache.sshd.common.channel; /** * A channel that can be either configured to use synchronous or asynchrounous streams. * - * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public interface StreamingChannel { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java new file mode 100644 index 0000000..a9b643a --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.channel.throttle; + +import java.io.IOException; +import java.nio.channels.Channel; + +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.util.buffer.Buffer; + +/** + * The ChannelStreamWriter is used when writing to the channel data stream. This data is encoded and sent with the + * {@link org.apache.sshd.common.SshConstants#SSH_MSG_CHANNEL_DATA} and + * {@link org.apache.sshd.common.SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA} commands. + * + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public interface ChannelStreamWriter extends Channel { + + /** + * Encode and send the given data packet buffer. <B>Note:</B> the buffer has to have 5 bytes free at the beginning + * to allow the encoding to take place. Also, the write position of the buffer has to be set to the position of the + * last byte to write. + * + * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer must not be touched until the returned + * write future is completed. + * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent + * @throws IOException if an error occurred when encoding or sending the packet + */ + IoWriteFuture writeData(Buffer buffer) throws IOException; + +} diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java similarity index 77% rename from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java rename to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java index 75aaaea..0b71544 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java @@ -19,7 +19,6 @@ package org.apache.sshd.common.channel.throttle; import org.apache.sshd.common.channel.Channel; -import org.apache.sshd.common.io.PacketWriter; /** * A special mechanism that enables users to intervene in the way packets are sent from {@code ChannelOutputStream}-s - @@ -28,18 +27,19 @@ import org.apache.sshd.common.io.PacketWriter; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ @FunctionalInterface -public interface ChannelStreamPacketWriterResolver { +public interface ChannelStreamWriterResolver { /** * An identity resolver - i.e., no special intervention - simply use the channel itself */ - ChannelStreamPacketWriterResolver NONE = (channel, cmd) -> channel; + ChannelStreamWriterResolver NONE = (channel, cmd) -> new DefaultChannelStreamWriter(channel); /** * @param channel The original {@link Channel} * @param cmd The {@code SSH_MSG_CHANNEL_DATA} or {@code SSH_MSG_CHANNEL_EXTENDED_DATA} command that triggered * the resolution - * @return The {@link PacketWriter} to use - <B>Note:</B> if the return value is not a {@link Channel} then - * it will be closed when the stream is closed + * @return The {@link ChannelStreamWriter} to use - <B>Note:</B> if the return value is not a + * {@link Channel} then it will be closed when the stream is closed */ - PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd); + ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd); + } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java similarity index 54% copy from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java copy to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java index e50eeb8..34f8391 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java @@ -19,26 +19,25 @@ package org.apache.sshd.common.channel.throttle; import org.apache.sshd.common.channel.Channel; -import org.apache.sshd.common.io.PacketWriter; /** * TODO Add javadoc * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public interface ChannelStreamPacketWriterResolverManager extends ChannelStreamPacketWriterResolver { - ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver(); +public interface ChannelStreamWriterResolverManager extends ChannelStreamWriterResolver { + ChannelStreamWriterResolver getChannelStreamWriterResolver(); - void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver); + void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver); - default ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() { - ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver(); - return (resolver == null) ? ChannelStreamPacketWriterResolver.NONE : resolver; + default ChannelStreamWriterResolver resolveChannelStreamWriterResolver() { + return getChannelStreamWriterResolver(); } @Override - default PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) { - ChannelStreamPacketWriterResolver resolver = resolveChannelStreamPacketWriterResolver(); - return (resolver == null) ? channel : resolver.resolveChannelStreamPacketWriter(channel, cmd); + default ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd) { + ChannelStreamWriterResolver resolver = resolveChannelStreamWriterResolver(); + ChannelStreamWriterResolver actual = (resolver == null) ? ChannelStreamWriterResolver.NONE : resolver; + return actual.resolveChannelStreamWriter(channel, cmd); } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java similarity index 53% rename from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java rename to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java index e50eeb8..83b1a91 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java @@ -18,27 +18,41 @@ */ package org.apache.sshd.common.channel.throttle; +import java.io.IOException; + import org.apache.sshd.common.channel.Channel; -import org.apache.sshd.common.io.PacketWriter; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.util.buffer.Buffer; /** - * TODO Add javadoc + * A ChannelStreamWriter that simply calls the {@link Channel#writePacket(Buffer)} method. * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public interface ChannelStreamPacketWriterResolverManager extends ChannelStreamPacketWriterResolver { - ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver(); +public class DefaultChannelStreamWriter implements ChannelStreamWriter { + + protected final Channel channel; + protected volatile boolean closed; - void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver); + public DefaultChannelStreamWriter(Channel channel) { + this.channel = channel; + } - default ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() { - ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver(); - return (resolver == null) ? ChannelStreamPacketWriterResolver.NONE : resolver; + @Override + public IoWriteFuture writeData(Buffer buffer) throws IOException { + if (closed) { + throw new IOException("ChannelStreamPacketWriter has been closed"); + } + return channel.writePacket(buffer); + } + + @Override + public boolean isOpen() { + return !closed; } @Override - default PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) { - ChannelStreamPacketWriterResolver resolver = resolveChannelStreamPacketWriterResolver(); - return (resolver == null) ? channel : resolver.resolveChannelStreamPacketWriter(channel, cmd); + public void close() throws IOException { + closed = true; } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java index f87d35c..156780e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java @@ -103,7 +103,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { protected void onMessage(Buffer buffer) throws IOException { IoOutputStream asyncIn = channel.getAsyncIn(); if (asyncIn != null) { - asyncIn.writePacket(buffer); + asyncIn.writeBuffer(buffer); } else { OutputStream invertedIn = channel.getInvertedIn(); invertedIn.write(buffer.array(), buffer.rpos(), buffer.available()); @@ -185,7 +185,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { buffer.putByte((byte) 0x00); buffer.putByte((byte) 0x00); try { - session.writePacket(buffer); + session.writeBuffer(buffer); } catch (IOException e) { // TODO Auto-generated catch block log.error("Failed ({}) to send channel open packet for {}: {}", e.getClass().getSimpleName(), channel, @@ -229,7 +229,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { buffer = new ByteArrayBuffer(Byte.SIZE, false); buffer.putByte((byte) 0x05); buffer.putByte((byte) (foundNoAuth ? 0x00 : 0xFF)); - session.writePacket(buffer); + session.writeBuffer(buffer); if (!foundNoAuth) { throw new IllegalStateException("Received socks5 greeting without NoAuth method"); } else if (debugEnabled) { @@ -304,7 +304,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { } response.wpos(wpos); try { - session.writePacket(response); + session.writeBuffer(response); } catch (IOException e) { log.error("Failed ({}) to send channel open response for {}: {}", e.getClass().getSimpleName(), channel, e.getMessage()); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java index c743948..2282b9a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java @@ -210,7 +210,7 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len); Window wLocal = getLocalWindow(); wLocal.consumeAndCheck(len); - serverSession.writePacket(buf); + serverSession.writeBuffer(buf); } @Override diff --git a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java index 577da74..3abf709 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java @@ -42,7 +42,7 @@ import org.apache.sshd.common.SyspropsMapWrapper; import org.apache.sshd.common.channel.ChannelFactory; import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.RequestHandler; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver; import org.apache.sshd.common.config.VersionProperties; import org.apache.sshd.common.file.FileSystemFactory; import org.apache.sshd.common.forward.ForwarderFactory; @@ -96,7 +96,7 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i private PropertyResolver parentResolver = SyspropsMapWrapper.SYSPROPS_RESOLVER; private ReservedSessionMessagesHandler reservedSessionMessagesHandler; private SessionDisconnectHandler sessionDisconnectHandler; - private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver; + private ChannelStreamWriterResolver channelStreamWriterResolver; private UnknownChannelReferenceHandler unknownChannelReferenceHandler; private IoServiceEventListener eventListener; @@ -314,13 +314,13 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i } @Override - public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { - return channelStreamPacketWriterResolver; + public ChannelStreamWriterResolver getChannelStreamWriterResolver() { + return channelStreamWriterResolver; } @Override - public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) { - channelStreamPacketWriterResolver = resolver; + public void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) { + channelStreamWriterResolver = resolver; } @Override diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index 2200ba2..af9d0e2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java @@ -166,9 +166,9 @@ public class Nio2Session extends AbstractCloseable implements IoSession { } @Override - public IoWriteFuture writePacket(Buffer buffer) throws IOException { + public IoWriteFuture writeBuffer(Buffer buffer) throws IOException { if (log.isDebugEnabled()) { - log.debug("writePacket({}) Writing {} bytes", this, buffer.available()); + log.debug("writeBuffer({}) writing {} bytes", this, buffer.available()); } ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available()); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java index 047dac7..081ccf4 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java @@ -31,13 +31,12 @@ import org.apache.sshd.common.FactoryManagerHolder; import org.apache.sshd.common.Service; import org.apache.sshd.common.auth.MutableUserHolder; import org.apache.sshd.common.channel.ChannelListenerManager; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; import org.apache.sshd.common.forward.PortForwardingInformationProvider; import org.apache.sshd.common.future.KeyExchangeFuture; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.kex.KexFactoryManager; import org.apache.sshd.common.kex.KeyExchange; import org.apache.sshd.common.session.helpers.TimeoutIndicator; @@ -58,12 +57,11 @@ public interface Session ReservedSessionMessagesManager, SessionDisconnectHandlerManager, ChannelListenerManager, - ChannelStreamPacketWriterResolverManager, + ChannelStreamWriterResolverManager, PortForwardingEventListenerManager, UnknownChannelReferenceHandlerManager, FactoryManagerHolder, - PortForwardingInformationProvider, - PacketWriter { + PortForwardingInformationProvider { /** * Create a new buffer for the specified SSH packet and reserve the needed space (5 bytes) for the packet header. @@ -103,7 +101,7 @@ public interface Session * "null" string is sent * @param lang The language - {@code null}/empty if some pre-agreed default is used * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent - * @throws IOException if an error occurred when encoding sending the packet + * @throws IOException if an error occurred when encoding or sending the packet * @see <A HREF="https://tools.ietf.org/html/rfc4253#section-11.3">RFC 4253 - section 11.3</A> */ IoWriteFuture sendDebugMessage(boolean display, Object msg, String lang) throws IOException; @@ -113,12 +111,22 @@ public interface Session * * @param data The message data * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent - * @throws IOException if an error occurred when encoding sending the packet + * @throws IOException if an error occurred when encoding or sending the packet * @see <A HREF="https://tools.ietf.org/html/rfc4253#section-11.2">RFC 4253 - section 11.2</A> */ IoWriteFuture sendIgnoreMessage(byte... data) throws IOException; /** + * Encode and send the given buffer. The buffer has to have 5 bytes free at the beginning to allow the encoding to + * take place. Also, the write position of the buffer has to be set to the position of the last byte to write. + * + * @param buffer the buffer to encode and send + * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent + * @throws IOException if an error occurred when encoding sending the packet + */ + IoWriteFuture writePacket(Buffer buffer) throws IOException; + + /** * Encode and send the given buffer with the specified timeout. If the buffer could not be written before the * timeout elapses, the returned {@link org.apache.sshd.common.io.IoWriteFuture} will be set with a * {@link java.util.concurrent.TimeoutException} exception to indicate a timeout. @@ -127,7 +135,7 @@ public interface Session * @param timeout the (never {@code null}) timeout value - its {@link Duration#toMillis() milliseconds} value * will be used * @return a future that can be used to check when the packet has actually been sent - * @throws IOException if an error occurred when encoding sending the packet + * @throws IOException if an error occurred when encoding or sending the packet * @see #writePacket(Buffer, long) */ default IoWriteFuture writePacket(Buffer buffer, Duration timeout) throws IOException { @@ -143,7 +151,7 @@ public interface Session * @param buffer the buffer to encode and spend * @param maxWaitMillis the timeout in milliseconds * @return a future that can be used to check when the packet has actually been sent - * @throws IOException if an error occurred when encoding sending the packet + * @throws IOException if an error occurred when encoding or sending the packet */ default IoWriteFuture writePacket(Buffer buffer, long maxWaitMillis) throws IOException { return writePacket(buffer, maxWaitMillis, TimeUnit.MILLISECONDS); @@ -158,7 +166,7 @@ public interface Session * @param timeout the timeout * @param unit the time unit of the timeout parameter * @return a future that can be used to check when the packet has actually been sent - * @throws IOException if an error occurred when encoding sending the packet + * @throws IOException if an error occurred when encoding or sending the packet */ IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws IOException; @@ -171,7 +179,7 @@ public interface Session * @param timeout The number of time units to wait - must be <U>positive</U> * @param unit The {@link TimeUnit} to wait for the response * @return the return buffer if the request was successful, {@code null} otherwise. - * @throws IOException if an error occurred when encoding sending the packet + * @throws IOException if an error occurred when encoding or sending the packet * @throws java.net.SocketTimeoutException If no response received within specified timeout */ default Buffer request( @@ -190,7 +198,7 @@ public interface Session * @param buffer the buffer containing the global request * @param timeout The (never {@code null}) timeout to wait - its milliseconds value is used * @return the return buffer if the request was successful, {@code null} otherwise. - * @throws IOException if an error occurred when encoding sending the packet + * @throws IOException if an error occurred when encoding or sending the packet * @throws java.net.SocketTimeoutException If no response received within specified timeout */ default Buffer request(String request, Buffer buffer, Duration timeout) throws IOException { @@ -206,7 +214,7 @@ public interface Session * @param buffer the buffer containing the global request * @param maxWaitMillis Max. time to wait for response (millis) - must be <U>positive</U> * @return the return buffer if the request was successful, {@code null} otherwise. - * @throws IOException if an error occurred when encoding sending the packet + * @throws IOException if an error occurred when encoding or sending the packet * @throws java.net.SocketTimeoutException If no response received within specified timeout */ Buffer request(String request, Buffer buffer, long maxWaitMillis) throws IOException; diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index 636a212..124895f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -63,7 +63,6 @@ import org.apache.sshd.common.future.KeyExchangeFuture; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.io.PacketWriter; import org.apache.sshd.common.kex.KexProposalOption; import org.apache.sshd.common.kex.KexState; import org.apache.sshd.common.kex.KeyExchange; @@ -254,6 +253,46 @@ public abstract class AbstractSession extends SessionHelper { } } + /** + * @param len The packet payload size + * @param blockSize The cipher block size + * @param etmMode Whether using "encrypt-then-MAC" mode + * @return The required padding length + */ + public static int calculatePadLength(int len, int blockSize, boolean etmMode) { + /* + * Note: according to RFC-4253 section 6: + * + * The minimum size of a packet is 16 (or the cipher block size, whichever is larger) bytes (plus 'mac'). + * + * Since all out ciphers, MAC(s), etc. have a block size > 8 then the minimum size of the packet will be at + * least 16 due to the padding at the very least - so even packets that contain an opcode with no arguments will + * be above this value. This avoids an un-necessary call to Math.max(len, 16) for each and every packet + */ + + len++; // the pad length + if (!etmMode) { + len += Integer.BYTES; + } + + /* + * Note: according to RFC-4253 section 6: + * + * Note that the length of the concatenation of 'packet_length', 'padding_length', 'payload', and 'random + * padding' MUST be a multiple of the cipher block size or 8, whichever is larger. + * + * However, we currently do not have ciphers with a block size of less than 8 so we do not take this into + * account in order to accelerate the calculation and avoiding an un-necessary call to Math.max(blockSize, 8) + * for each and every packet. + */ + int pad = (-len) & (blockSize - 1); + if (pad < blockSize) { + pad += blockSize; + } + + return pad; + } + @Override public String getServerVersion() { return serverVersion; @@ -935,7 +974,7 @@ public abstract class AbstractSession extends SessionHelper { ignoreBuf = encode(ignoreBuf); IoSession networkSession = getIoSession(); - networkSession.writePacket(ignoreBuf); + networkSession.writeBuffer(ignoreBuf); } return encode(buffer); @@ -948,7 +987,7 @@ public abstract class AbstractSession extends SessionHelper { synchronized (encodeLock) { Buffer packet = resolveOutputPacket(buffer); IoSession networkSession = getIoSession(); - IoWriteFuture future = networkSession.writePacket(packet); + IoWriteFuture future = networkSession.writeBuffer(packet); return future; } } @@ -1104,7 +1143,7 @@ public abstract class AbstractSession extends SessionHelper { boolean etmMode = outMac != null && outMac.isEncryptThenMac(); int authLen = outCipher != null ? outCipher.getAuthenticationTagSize() : 0; boolean authMode = authLen > 0; - int pad = PacketWriter.calculatePadLength(len, outCipherSize, etmMode || authMode); + int pad = calculatePadLength(len, outCipherSize, etmMode || authMode); len += SshConstants.SSH_PACKET_HEADER_LEN + pad + authLen; if (outMac != null) { len += outMacSize; @@ -1204,7 +1243,7 @@ public abstract class AbstractSession extends SessionHelper { boolean authMode = authSize > 0; int oldLen = len; - int pad = PacketWriter.calculatePadLength(len, outCipherSize, etmMode || authMode); + int pad = calculatePadLength(len, outCipherSize, etmMode || authMode); len += Byte.BYTES + pad; diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java index 9c15dda..6883451 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java @@ -48,8 +48,8 @@ import org.apache.sshd.common.PropertyResolver; import org.apache.sshd.common.RuntimeSshException; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager; import org.apache.sshd.common.digest.Digest; import org.apache.sshd.common.forward.Forwarder; import org.apache.sshd.common.future.DefaultSshFuture; @@ -104,7 +104,7 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements private ReservedSessionMessagesHandler reservedSessionMessagesHandler; private SessionDisconnectHandler sessionDisconnectHandler; private UnknownChannelReferenceHandler unknownChannelReferenceHandler; - private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver; + private ChannelStreamWriterResolver channelStreamPacketWriterResolver; /** * The name of the authenticated user @@ -514,24 +514,24 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements } @Override - public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { + public ChannelStreamWriterResolver getChannelStreamWriterResolver() { return channelStreamPacketWriterResolver; } @Override - public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) { + public void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) { channelStreamPacketWriterResolver = resolver; } @Override - public ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() { - ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver(); + public ChannelStreamWriterResolver resolveChannelStreamWriterResolver() { + ChannelStreamWriterResolver resolver = getChannelStreamWriterResolver(); if (resolver != null) { return resolver; } - ChannelStreamPacketWriterResolverManager manager = getFactoryManager(); - return manager.resolveChannelStreamPacketWriterResolver(); + ChannelStreamWriterResolverManager manager = getFactoryManager(); + return manager.resolveChannelStreamWriterResolver(); } @Override @@ -793,7 +793,7 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements IoSession networkSession = getIoSession(); byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8); - return networkSession.writePacket(new ByteArrayBuffer(data)); + return networkSession.writeBuffer(new ByteArrayBuffer(data)); } /** diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index e14775a..874b49e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -217,24 +217,26 @@ public class TcpipServerChannel extends AbstractServerChannel implements Streami if (streaming == Streaming.Async) { out = new BufferedIoOutputStream( "tcpip channel", new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) { - @SuppressWarnings("synthetic-access") - @Override - protected CloseFuture doCloseGracefully() { - try { - sendEof(); - } catch (IOException e) { - session.exceptionCaught(e); - } - return super.doCloseGracefully(); - } - }); + @SuppressWarnings("synthetic-access") + @Override + protected CloseFuture doCloseGracefully() { + try { + sendEof(); + } catch (IOException e) { + session.exceptionCaught(e); + } + return super.doCloseGracefully(); + } + }); } else { - this.out = new SimpleIoOutputStream(new ChannelOutputStream( - this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true)); + this.out = new SimpleIoOutputStream( + new ChannelOutputStream( + this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true)); } long thresholdHigh = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH.getRequired(this); - long thresholdLow = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh / 2); + long thresholdLow + = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh / 2); IoHandler handler = new IoHandler() { @Override @SuppressWarnings("synthetic-access") @@ -251,7 +253,7 @@ public class TcpipServerChannel extends AbstractServerChannel implements Streami if (total > thresholdHigh) { session.suspendRead(); } - IoWriteFuture ioWriteFuture = out.writePacket(buffer); + IoWriteFuture ioWriteFuture = out.writeBuffer(buffer); ioWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() { @Override public void operationComplete(IoWriteFuture future) { @@ -379,7 +381,7 @@ public class TcpipServerChannel extends AbstractServerChannel implements Streami ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds int boundaries: %d", len); // Make sure we copy the data as the incoming buffer may be reused Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len); - ioSession.writePacket(buf).addListener(future -> { + ioSession.writeBuffer(buf).addListener(future -> { if (future.isWritten()) { handleWriteDataSuccess( SshConstants.SSH_MSG_CHANNEL_DATA, buf.array(), 0, (int) len); diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java index 06fcc46..4c2c9b3 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java @@ -325,7 +325,7 @@ public abstract class AbstractServerSession extends AbstractSession implements S startService(authService, buffer); // Now we can inform the peer that authentication is successful - future = networkSession.writePacket(packet); + future = networkSession.writeBuffer(packet); } resetIdleTimeout(); @@ -491,7 +491,7 @@ public abstract class AbstractServerSession extends AbstractSession implements S if (err != null) { IoSession networkSession = getIoSession(); - networkSession.writePacket( + networkSession.writeBuffer( new ByteArrayBuffer((err.getMessage() + "\n").getBytes(StandardCharsets.UTF_8))) .addListener(future -> close(true)); throw err; diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java index 2c1cdee..9dd046a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java @@ -101,7 +101,7 @@ public class ChannelForwardedX11 extends AbstractClientChannel { wLocal.consumeAndCheck(len); // use a clone in case data buffer is re-used Buffer packet = ByteArrayBuffer.getCompactClone(data, off, (int) len); - serverSession.writePacket(packet); + serverSession.writeBuffer(packet); } @Override diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java index 7afae32..5c33c0c 100644 --- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java @@ -23,20 +23,16 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.sshd.client.ClientBuilder; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.channel.ClientChannel; import org.apache.sshd.client.channel.ClientChannelEvent; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.channel.Channel; -import org.apache.sshd.common.cipher.BuiltinCiphers; -import org.apache.sshd.common.kex.BuiltinDHFactories; import org.apache.sshd.common.util.security.SecurityUtils; import org.apache.sshd.core.CoreModuleProperties; import org.apache.sshd.server.SshServer; diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java index 98afcb8..bb43718 100644 --- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java @@ -300,7 +300,7 @@ public class WindowAdjustTest extends BaseTestSupport { private void writeWithPendingDetection(Buffer msg, boolean wasPending) throws IOException { try { - asyncIn.writePacket(msg).addListener(future -> { + asyncIn.writeBuffer(msg).addListener(future -> { if (future.isWritten()) { if (wasPending) { pending.remove(); diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java index c7487f6..cf2e0cd 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java @@ -73,6 +73,7 @@ import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelListener; +import org.apache.sshd.common.channel.StreamingChannel; import org.apache.sshd.common.channel.exception.SshChannelClosedException; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; @@ -523,7 +524,7 @@ public class ClientTest extends BaseTestSupport { try (ClientSession session = createTestClientSession(); ChannelShell channel = session.createShellChannel()) { - channel.setStreaming(ClientChannel.Streaming.Async); + channel.setStreaming(StreamingChannel.Streaming.Async); channel.open().verify(OPEN_TIMEOUT); byte[] message = "0123456789\n".getBytes(StandardCharsets.UTF_8); @@ -533,14 +534,14 @@ public class ClientTest extends BaseTestSupport { AtomicInteger writes = new AtomicInteger(nbMessages); IoOutputStream asyncIn = channel.getAsyncIn(); - asyncIn.writePacket(new ByteArrayBuffer(message)) + asyncIn.writeBuffer(new ByteArrayBuffer(message)) .addListener(new SshFutureListener<IoWriteFuture>() { @Override public void operationComplete(IoWriteFuture future) { try { if (future.isWritten()) { if (writes.decrementAndGet() > 0) { - asyncIn.writePacket(new ByteArrayBuffer(message)).addListener(this); + asyncIn.writeBuffer(new ByteArrayBuffer(message)).addListener(this); } else { asyncIn.close(false); } @@ -622,7 +623,7 @@ public class ClientTest extends BaseTestSupport { ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); try (ChannelExec channel = session.createExecChannel("test")) { - channel.setStreaming(ClientChannel.Streaming.Async); + channel.setStreaming(StreamingChannel.Streaming.Async); OpenFuture open = channel.open(); Thread.sleep(100L); // Removing this line will make the test succeed diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java index 7618206..f42ad61 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.channel.ChannelShell; -import org.apache.sshd.client.channel.ClientChannel; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.RuntimeSshException; @@ -273,7 +272,7 @@ public class WindowTest extends BaseTestSupport { session.auth().verify(AUTH_TIMEOUT); try (ChannelShell channel = session.createShellChannel()) { - channel.setStreaming(ClientChannel.Streaming.Async); + channel.setStreaming(StreamingChannel.Streaming.Async); channel.open().verify(OPEN_TIMEOUT); try (Channel serverChannel = GenericUtils.head(GenericUtils.head(sshd.getActiveSessions()) @@ -290,7 +289,7 @@ public class WindowTest extends BaseTestSupport { IoInputStream input = channel.getAsyncOut(); for (int i = 0; i < nbMessages; i++) { Buffer buffer = new ByteArrayBuffer(bytes); - output.writePacket(buffer).verify(DEFAULT_TIMEOUT); + output.writeBuffer(buffer).verify(DEFAULT_TIMEOUT); waitForWindowNotEquals(clientLocal, serverRemote, "client local", "server remote", TimeUnit.SECONDS.toMillis(3L)); diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java index 1715ed2..1bbe342 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java @@ -322,7 +322,10 @@ public class PortForwardingLoadTest extends BaseTestSupport { lastReport = readSize; } } catch (SocketTimeoutException e) { - throw new IOException("Error reading data at index " + readSize + "/" + dataBytes.length + " of iteration #" + i, e); + throw new IOException( + "Error reading data at index " + readSize + "/" + dataBytes.length + " of iteration #" + + i, + e); } } assertPayloadEquals("Mismatched payload at iteration #" + i, dataBytes, baos.toByteArray()); diff --git a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java index 35bd8b5..d8fd35f 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java @@ -389,7 +389,7 @@ public class AbstractSessionTest extends BaseTestSupport { } @Override - public IoWriteFuture writePacket(Buffer buffer) throws IOException { + public IoWriteFuture writeBuffer(Buffer buffer) throws IOException { if (!isOpen()) { throw new EOFException("Not open"); } diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java index 0d09ec9..553142e 100644 --- a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java @@ -132,7 +132,7 @@ public class ServerProxyAcceptorTest extends BaseTestSupport { client.setClientProxyConnector(session -> { IoSession ioSession = session.getIoSession(); - ioSession.writePacket(new ByteArrayBuffer(metaDataBytes)); + ioSession.writeBuffer(new ByteArrayBuffer(metaDataBytes)); }); client.start(); diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java index 9084a13..b550893 100644 --- a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java +++ b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java @@ -140,7 +140,7 @@ public class AsyncEchoShellFactory implements ShellFactory { if (buffer.charAt(i) == '\n') { String s = buffer.substring(0, i + 1); byte[] bytes = s.getBytes(StandardCharsets.UTF_8); - out.writePacket(new ByteArrayBuffer(bytes)).addListener(future -> { + out.writeBuffer(new ByteArrayBuffer(bytes)).addListener(future -> { Session session1 = channel.getSession(); if (future.isWritten()) { try { diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java index 8700e7a..80e576c 100644 --- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java +++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java @@ -24,8 +24,9 @@ import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.channel.AbstractChannel; import org.apache.sshd.common.channel.Channel; -import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver; -import org.apache.sshd.common.io.PacketWriter; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriter; +import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver; +import org.apache.sshd.common.channel.throttle.DefaultChannelStreamWriter; import org.apache.sshd.common.util.buffer.Buffer; public class BogusChannel extends AbstractChannel { @@ -64,12 +65,13 @@ public class BogusChannel extends AbstractChannel { } @Override - public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() { - return ChannelStreamPacketWriterResolver.NONE; + public ChannelStreamWriterResolver getChannelStreamWriterResolver() { + return ChannelStreamWriterResolver.NONE; } @Override - public PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) { - return channel; + public ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd) { + return new DefaultChannelStreamWriter(channel); } + } diff --git a/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java b/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java index af92e6e..04c7ea7 100644 --- a/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java +++ b/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java @@ -167,7 +167,7 @@ public class MinaSession extends AbstractInnerCloseable implements IoSession { } @Override // NOTE !!! data buffer may NOT be re-used when method returns - at least until IoWriteFuture is signalled - public IoWriteFuture writePacket(Buffer buffer) { + public IoWriteFuture writeBuffer(Buffer buffer) { return write(MinaSupport.asIoBuffer(buffer)); } diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java index 984c32d..01ab21d 100644 --- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java @@ -136,7 +136,7 @@ public class NettyIoSession extends AbstractCloseable implements IoSession { } @Override - public IoWriteFuture writePacket(Buffer buffer) { + public IoWriteFuture writeBuffer(Buffer buffer) { int bufLen = buffer.available(); ByteBuf buf = Unpooled.buffer(bufLen); buf.writeBytes(buffer.array(), buffer.rpos(), bufLen); diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java index 9db3731..ba29ed4 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java @@ -288,7 +288,7 @@ public class DefaultSftpClient extends AbstractSftpClient { } IoOutputStream asyncIn = channel.getAsyncIn(); - IoWriteFuture writeFuture = asyncIn.writePacket(buf); + IoWriteFuture writeFuture = asyncIn.writeBuffer(buf); writeFuture.verify(); return id; } @@ -368,7 +368,7 @@ public class DefaultSftpClient extends AbstractSftpClient { if (traceEnabled) { log.trace("init({}) send SSH_FXP_INIT - initial version={}", clientChannel, initialVersion); } - IoWriteFuture writeFuture = asyncIn.writePacket(buf); + IoWriteFuture writeFuture = asyncIn.writeBuffer(buf); writeFuture.verify(); if (traceEnabled) { diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java index 034794f..5ad6869 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java @@ -943,7 +943,7 @@ public class SftpSubsystem @Override protected void send(Buffer buffer) throws IOException { BufferUtils.updateLengthPlaceholder(buffer, 0); - out.writePacket(buffer); + out.writeBuffer(buffer); } @Override