This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 6931ecec7085241b66192bb38ec97a8965881f95 Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Tue Nov 5 19:38:08 2019 +0200 Provide configurable control over the client-side ChannelSession stdin pump chunk size --- CHANGES.md | 2 + .../org/apache/sshd/common/util/io/IoUtils.java | 51 ++++++++++++++-------- .../apache/sshd/client/channel/ChannelSession.java | 45 ++++++++++++------- .../apache/sshd/server/channel/ChannelSession.java | 6 ++- 4 files changed, 69 insertions(+), 35 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 983f320..dfacf54 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,8 @@ the message type=30 (old request). * The `MacInformation` interface has an extra `isEncryptThenMac` method (default=_false_) to enable distinction of this mode. +* Provide configurable control over the client-side `ChannelSession` _stdin_ pump chunk size. + ## Behavioral changes and enhancements * [SSHD-926](https://issues.apache.org/jira/browse/SSHD-930) - Add support for OpenSSH 'lsets...@openssh.com' SFTP protocol extension. diff --git a/sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java b/sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java index 910a6a3..2fc5f72 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java @@ -145,7 +145,9 @@ public final class IoUtils { */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public static IOException closeQuietly(Closeable... closeables) { - return closeQuietly(GenericUtils.isEmpty(closeables) ? Collections.emptyList() : Arrays.asList(closeables)); + return closeQuietly(GenericUtils.isEmpty(closeables) + ? Collections.emptyList() + : Arrays.asList(closeables)); } /** @@ -303,22 +305,23 @@ public final class IoUtils { * owner if <U>any</U> of relevant the owner/group/others permission is set */ public static void setPermissionsToFile(File f, Collection<PosixFilePermission> perms) { - boolean readable = perms != null - && (perms.contains(PosixFilePermission.OWNER_READ) - || perms.contains(PosixFilePermission.GROUP_READ) - || perms.contains(PosixFilePermission.OTHERS_READ)); + boolean havePermissions = GenericUtils.isNotEmpty(perms); + boolean readable = havePermissions + && (perms.contains(PosixFilePermission.OWNER_READ) + || perms.contains(PosixFilePermission.GROUP_READ) + || perms.contains(PosixFilePermission.OTHERS_READ)); f.setReadable(readable, false); - boolean writable = perms != null - && (perms.contains(PosixFilePermission.OWNER_WRITE) - || perms.contains(PosixFilePermission.GROUP_WRITE) - || perms.contains(PosixFilePermission.OTHERS_WRITE)); + boolean writable = havePermissions + && (perms.contains(PosixFilePermission.OWNER_WRITE) + || perms.contains(PosixFilePermission.GROUP_WRITE) + || perms.contains(PosixFilePermission.OTHERS_WRITE)); f.setWritable(writable, false); - boolean executable = perms != null - && (perms.contains(PosixFilePermission.OWNER_EXECUTE) - || perms.contains(PosixFilePermission.GROUP_EXECUTE) - || perms.contains(PosixFilePermission.OTHERS_EXECUTE)); + boolean executable = havePermissions + && (perms.contains(PosixFilePermission.OWNER_EXECUTE) + || perms.contains(PosixFilePermission.GROUP_EXECUTE) + || perms.contains(PosixFilePermission.OTHERS_EXECUTE)); f.setExecutable(executable, false); } @@ -401,7 +404,9 @@ public final class IoUtils { * @throws IOException if there is a problem reading the file * @throws EOFException if the number of bytes read was incorrect */ - public static void readFully(InputStream input, byte[] buffer, int offset, int length) throws IOException { + public static void readFully( + InputStream input, byte[] buffer, int offset, int length) + throws IOException { int actual = read(input, buffer, offset, length); if (actual != length) { throw new EOFException("Premature EOF - expected=" + length + ", actual=" + actual); @@ -430,7 +435,9 @@ public final class IoUtils { * @return actual length read; may be less than requested if EOF was reached * @throws IOException if a read error occurs */ - public static int read(InputStream input, byte[] buffer, int offset, int length) throws IOException { + public static int read( + InputStream input, byte[] buffer, int offset, int length) + throws IOException { for (int remaining = length, curOffset = offset; remaining > 0;) { int count = input.read(buffer, curOffset, remaining); if (count == -1) { // EOF before achieved required length @@ -450,7 +457,8 @@ public final class IoUtils { * @return The violating {@link PosixFilePermission} - {@code null} * if no violating permission found */ - public static PosixFilePermission validateExcludedPermissions(Collection<PosixFilePermission> perms, Collection<PosixFilePermission> excluded) { + public static PosixFilePermission validateExcludedPermissions( + Collection<PosixFilePermission> perms, Collection<PosixFilePermission> excluded) { if (GenericUtils.isEmpty(perms) || GenericUtils.isEmpty(excluded)) { return null; } @@ -474,6 +482,7 @@ public final class IoUtils { if (!Files.isDirectory(path, options)) { throw new UnsupportedOperationException("Not a directory: " + path); } + return path; } @@ -505,7 +514,9 @@ public final class IoUtils { return prefix; } - StringBuilder sb = new StringBuilder(prefix.length() + component.length() + File.separator.length()).append(prefix); + StringBuilder sb = new StringBuilder( + prefix.length() + component.length() + File.separator.length()) + .append(prefix); if (sb.charAt(prefix.length() - 1) == File.separatorChar) { if (component.charAt(0) == File.separatorChar) { @@ -554,13 +565,15 @@ public final class IoUtils { * @see #readAllLines(Reader) */ public static List<String> readAllLines(InputStream stream) throws IOException { - try (Reader reader = new InputStreamReader(Objects.requireNonNull(stream, "No stream instance"), StandardCharsets.UTF_8)) { + try (Reader reader = new InputStreamReader( + Objects.requireNonNull(stream, "No stream instance"), StandardCharsets.UTF_8)) { return readAllLines(reader); } } public static List<String> readAllLines(Reader reader) throws IOException { - try (BufferedReader br = new BufferedReader(Objects.requireNonNull(reader, "No reader instance"), DEFAULT_COPY_SIZE)) { + try (BufferedReader br = new BufferedReader( + Objects.requireNonNull(reader, "No reader instance"), DEFAULT_COPY_SIZE)) { return readAllLines(br); } } diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java index dfe7f40..6612079 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.util.concurrent.Future; import org.apache.sshd.common.Closeable; +import org.apache.sshd.common.PropertyResolverUtils; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelAsyncInputStream; import org.apache.sshd.common.channel.ChannelAsyncOutputStream; @@ -39,11 +40,22 @@ import org.apache.sshd.common.util.threads.CloseableExecutorService; import org.apache.sshd.common.util.threads.ThreadUtils; /** - * TODO Add javadoc + * Client side channel session * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public class ChannelSession extends AbstractClientChannel { + /** + * On some platforms, a call to {@ode System.in.read(new byte[65536], 0, 32768)} + * always throws an {@link IOException}. So we need to protect against that and chunk + * the call into smaller calls. This problem was found on Windows, JDK 1.6.0_03-b05. + */ + public static final String INPUT_STREAM_PUMP_CHUNK_SIZE = "stdin-pump-chunk-size"; + + /** + * Default (and also <U>minimum</U>) value of {@value #INPUT_STREAM_PUMP_CHUNK_SIZE} + */ + public static final int DEFAULT_INPUT_STREAM_PUMP_CHUNK_SIZE = 1024; private CloseableExecutorService pumperService; private Future<?> pumper; @@ -93,7 +105,7 @@ public class ChannelSession extends AbstractClientChannel { CloseableExecutorService service = getExecutorService(); if (service == null) { pumperService = ThreadUtils.newSingleThreadExecutor( - "ClientInputStreamPump[" + this.toString() + "]"); + "ClientInputStreamPump[" + this + "]"); } else { pumperService = ThreadUtils.noClose(service); } @@ -164,11 +176,15 @@ public class ChannelSession extends AbstractClientChannel { Session session = getSession(); Window wRemote = getRemoteWindow(); long packetSize = wRemote.getPacketSize(); - ValidateUtils.checkTrue(packetSize < Integer.MAX_VALUE, - "Remote packet size exceeds int boundary: %d", packetSize); + ValidateUtils.checkTrue((packetSize > 0) && (packetSize < Integer.MAX_VALUE), + "Invalid remote packet size int boundary: %d", packetSize); byte[] buffer = new byte[(int) packetSize]; + int maxChunkSize = PropertyResolverUtils.getIntProperty( + session, INPUT_STREAM_PUMP_CHUNK_SIZE, DEFAULT_INPUT_STREAM_PUMP_CHUNK_SIZE); + maxChunkSize = Math.max(maxChunkSize, DEFAULT_INPUT_STREAM_PUMP_CHUNK_SIZE); + while (!closeFuture.isClosed()) { - int len = securedRead(in, buffer, 0, buffer.length); + int len = securedRead(in, maxChunkSize, buffer, 0, buffer.length); if (len < 0) { if (log.isDebugEnabled()) { log.debug("pumpInputStream({}) EOF signalled", this); @@ -201,24 +217,23 @@ public class ChannelSession extends AbstractClientChannel { } } - // - // On some platforms, a call to System.in.read(new byte[65536], 0,32768) always throws an IOException. - // So we need to protect against that and chunk the call into smaller calls. - // This problem was found on Windows, JDK 1.6.0_03-b05. - // - protected int securedRead(InputStream in, byte[] buf, int off, int len) throws IOException { - int n = 0; - for (;;) { - int nread = in.read(buf, off + n, Math.min(1024, len - n)); + protected int securedRead( + InputStream in, int maxChunkSize, byte[] buf, int off, int len) + throws IOException { + for (int n = 0;;) { + int nread = in.read(buf, off + n, Math.min(maxChunkSize, len - n)); if (nread <= 0) { return (n == 0) ? nread : n; } + n += nread; if (n >= len) { return n; } + // if not closed but no bytes available, return - if (in.available() <= 0) { + int availLen = in.available(); + if (availLen <= 0) { return n; } } diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index 6c02cc2..7bce3f2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -77,7 +77,7 @@ import org.apache.sshd.server.subsystem.SubsystemFactory; import org.apache.sshd.server.x11.X11ForwardSupport; /** - * TODO Add javadocWindowInitTest + * Server side channel session * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ @@ -90,6 +90,10 @@ public class ChannelSession extends AbstractServerChannel { * until a {@link ChannelDataReceiver} for the data is registered */ public static final String MAX_EXTDATA_BUFSIZE = "channel-session-max-extdata-bufsize"; + + /** + * Default value of {@value #MAX_EXTDATA_BUFSIZE} + */ public static final int DEFAULT_MAX_EXTDATA_BUFSIZE = 0; protected String type;