This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push: new a0fcbf8 [SSHD-1022] Fixed NPE SftpOutputStreamAsync#flush() if invoked with no bytes written in-between a0fcbf8 is described below commit a0fcbf8e8756e2de6c39c25c9c9959e6c7306e3c Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Thu Jun 25 21:22:38 2020 +0300 [SSHD-1022] Fixed NPE SftpOutputStreamAsync#flush() if invoked with no bytes written in-between --- CHANGES.md | 2 + .../subsystem/sftp/impl/SftpOutputStreamAsync.java | 131 ++++++++++++++++----- .../sshd/client/subsystem/sftp/SftpTest.java | 43 ++++++- 3 files changed, 146 insertions(+), 30 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 930ad69..24b15a3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,3 +13,5 @@ ## Minor code helpers ## Behavioral changes and enhancements + +* [SSHD-1022](https://issues.apache.org/jira/browse/SSHD-1022) NPE in `SftpOutputStreamAsync#flush()` if no data written in between. diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java index b5b809c..b442ffe 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java @@ -33,6 +33,8 @@ import org.apache.sshd.common.subsystem.sftp.SftpConstants; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.io.OutputStreamWithChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements an output stream for a given remote file @@ -40,6 +42,7 @@ import org.apache.sshd.common.util.io.OutputStreamWithChannel; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public class SftpOutputStreamAsync extends OutputStreamWithChannel { + protected final Logger log; protected final byte[] bb = new byte[1]; protected final int bufferSize; protected Buffer buffer; @@ -47,12 +50,13 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { protected long offset; protected final Deque<SftpAckData> pendingWrites = new LinkedList<>(); - private final AbstractSftpClient client; + private final AbstractSftpClient clientInstance; private final String path; public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize, String path, Collection<OpenMode> mode) throws IOException { - this.client = Objects.requireNonNull(client, "No SFTP client instance"); + this.log = LoggerFactory.getLogger(getClass()); + this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance"); this.path = path; this.handle = client.open(path, mode); this.bufferSize = bufferSize; @@ -60,7 +64,8 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize, String path, CloseableHandle handle) throws IOException { - this.client = Objects.requireNonNull(client, "No SFTP client instance"); + this.log = LoggerFactory.getLogger(getClass()); + this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance"); this.path = path; this.handle = handle; this.bufferSize = bufferSize; @@ -72,7 +77,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { * @return {@link SftpClient} instance used to access the remote file */ public final AbstractSftpClient getClient() { - return client; + return clientInstance; } public void setOffset(long offset) { @@ -102,23 +107,39 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { @Override public void write(byte[] b, int off, int len) throws IOException { byte[] id = handle.getIdentifier(); + SftpClient client = getClient(); Session session = client.getSession(); + boolean traceEnabled = log.isTraceEnabled(); + int writtenCount = 0; + int totalLen = len; do { if (buffer == null) { + if (traceEnabled) { + log.trace("write({}) allocate buffer size={} after {}/{} bytes", + this, bufferSize, writtenCount, totalLen); + } + buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize); int hdr = 9 + 16 + 8 + id.length + buffer.wpos(); buffer.rpos(hdr); buffer.wpos(hdr); } + int max = bufferSize - (9 + 16 + id.length + 72); int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos())); buffer.putRawBytes(b, off, nb); + + off += nb; + len -= nb; + writtenCount += nb; + if (buffer.available() == max) { + if (traceEnabled) { + log.trace("write({}) flush after {}/{} bytes", this, writtenCount, totalLen); + } flush(); } - off += nb; - len -= nb; } while (len > 0); } @@ -128,19 +149,43 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { throw new IOException("flush(" + getPath() + ") stream is closed"); } - for (;;) { + boolean debugEnabled = log.isDebugEnabled(); + AbstractSftpClient client = getClient(); + for (int ackIndex = 0;;) { SftpAckData ack = pendingWrites.peek(); - if (ack != null) { - Buffer response = client.receive(ack.id, 0L); - if (response != null) { - pendingWrites.removeFirst(); - client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response); - } else { - break; + if (ack == null) { + if (debugEnabled) { + log.debug("flush({}) processed {} pending writes", this, ackIndex); } - } else { break; } + + ackIndex++; + if (debugEnabled) { + log.debug("flush({}) waiting for ack #{}: {}", this, ackIndex, ack); + } + + Buffer response = client.receive(ack.id, 0L); + if (response == null) { + if (debugEnabled) { + log.debug("flush({}) no response for ack #{}: {}", this, ackIndex, ack); + } + break; + } + + if (debugEnabled) { + log.debug("flush({}) processing ack #{}: {}", this, ackIndex, ack); + } + + ack = pendingWrites.removeFirst(); + client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response); + } + + if (buffer == null) { + if (debugEnabled) { + log.debug("flush({}) no pending buffer to flush", this); + } + return; } byte[] id = handle.getIdentifier(); @@ -163,7 +208,11 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { } int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf); - pendingWrites.add(new SftpAckData(reqId, offset, avail)); + SftpAckData ack = new SftpAckData(reqId, offset, avail); + if (debugEnabled) { + log.debug("flush({}) enueue pending ack={}", this, ack); + } + pendingWrites.add(ack); offset += avail; buffer = null; @@ -171,23 +220,51 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { @Override public void close() throws IOException { - if (isOpen()) { + if (!isOpen()) { + return; + } + + try { + boolean debugEnabled = log.isDebugEnabled(); + try { - try { - if ((buffer != null) && (buffer.available() > 0)) { - flush(); + int pendingSize = (buffer == null) ? 0 : buffer.available(); + if (pendingSize > 0) { + if (debugEnabled) { + log.debug("close({}) flushing {} pending bytes", this, pendingSize); } - while (!pendingWrites.isEmpty()) { - SftpAckData ack = pendingWrites.removeFirst(); - Buffer response = client.receive(ack.id); - client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response); + flush(); + } + + AbstractSftpClient client = getClient(); + for (int ackIndex = 1; !pendingWrites.isEmpty(); ackIndex++) { + SftpAckData ack = pendingWrites.removeFirst(); + if (debugEnabled) { + log.debug("close({}) processing ack #{}: {}", this, ackIndex, ack); } - } finally { - handle.close(); + + Buffer response = client.receive(ack.id); + if (debugEnabled) { + log.debug("close({}) processing ack #{} response for {}", this, ackIndex, ack); + } + client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response); } } finally { - handle = null; + if (debugEnabled) { + log.debug("close({}) closing file handle", this); + } + handle.close(); } + } finally { + handle = null; } } + + @Override + public String toString() { + SftpClient client = getClient(); + return getClass().getSimpleName() + + "[" + client.getSession() + "]" + + "[" + getPath() + "]"; + } } diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java index e155ff1..35d6e5e 100644 --- a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java +++ b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java @@ -67,6 +67,7 @@ import org.apache.sshd.client.subsystem.sftp.extensions.BuiltinSftpClientExtensi import org.apache.sshd.client.subsystem.sftp.extensions.SftpClientExtension; import org.apache.sshd.client.subsystem.sftp.impl.AbstractSftpClient; import org.apache.sshd.client.subsystem.sftp.impl.DefaultCloseableHandle; +import org.apache.sshd.client.subsystem.sftp.impl.SftpOutputStreamAsync; import org.apache.sshd.common.Factory; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.OptionalFeature; @@ -1608,14 +1609,14 @@ public class SftpTest extends AbstractSftpClientTestSupport { protected void sendFile(String path, String data) throws Exception { ChannelSftp c = (ChannelSftp) session.openChannel(SftpConstants.SFTP_SUBSYSTEM_NAME); c.connect(); - try { - c.put(new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)), path); + try (InputStream srcStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) { + c.put(srcStream, path); } finally { c.disconnect(); } } - private String randomString(int size) { + private static String randomString(int size) { StringBuilder sb = new StringBuilder(size); for (int i = 0; i < size; i++) { sb.append((char) ((i % 10) + '0')); @@ -1623,6 +1624,42 @@ public class SftpTest extends AbstractSftpClientTestSupport { return sb.toString(); } + @Test // see SSHD-1022 + public void testFlushOutputStreamWithoutWrite() throws Exception { + Path targetPath = detectTargetFolder(); + Path lclSftp = CommonTestSupportUtils.resolve( + targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME, getClass().getSimpleName(), getCurrentTestName()); + CommonTestSupportUtils.deleteRecursive(lclSftp); + + Path parentPath = targetPath.getParent(); + Path clientFolder = assertHierarchyTargetFolderExists(lclSftp.resolve("client")); + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + .verify(CONNECT_TIMEOUT).getSession()) { + session.addPasswordIdentity(getCurrentTestName()); + session.auth().verify(AUTH_TIMEOUT); + + try (SftpClient sftp = createSftpClient(session)) { + Path file = clientFolder.resolve("file.txt"); + String filePath = CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, file); + try (OutputStream os = sftp.write(filePath, SftpClient.MIN_WRITE_BUFFER_SIZE)) { + assertObjectInstanceOf(SftpOutputStreamAsync.class.getSimpleName(), SftpOutputStreamAsync.class, os); + + for (int index = 1; index <= 5; index++) { + outputDebugMessage("%s - pre write flush attempt #%d", getCurrentTestName(), index); + os.flush(); + } + + os.write((getCurrentTestName() + "\n").getBytes(StandardCharsets.UTF_8)); + + for (int index = 1; index <= 5; index++) { + outputDebugMessage("%s - post write flush attempt #%d", getCurrentTestName(), index); + os.flush(); + } + } + } + } + } + static class LinkData { private final Path source; private final Path target;