This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git


The following commit(s) were added to refs/heads/master by this push:
     new a0fcbf8  [SSHD-1022] Fixed NPE SftpOutputStreamAsync#flush() if 
invoked with no bytes written in-between
a0fcbf8 is described below

commit a0fcbf8e8756e2de6c39c25c9c9959e6c7306e3c
Author: Lyor Goldstein <lgoldst...@apache.org>
AuthorDate: Thu Jun 25 21:22:38 2020 +0300

    [SSHD-1022] Fixed NPE SftpOutputStreamAsync#flush() if invoked with no 
bytes written in-between
---
 CHANGES.md                                         |   2 +
 .../subsystem/sftp/impl/SftpOutputStreamAsync.java | 131 ++++++++++++++++-----
 .../sshd/client/subsystem/sftp/SftpTest.java       |  43 ++++++-
 3 files changed, 146 insertions(+), 30 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 930ad69..24b15a3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,3 +13,5 @@
 ## Minor code helpers
 
 ## Behavioral changes and enhancements
+
+* [SSHD-1022](https://issues.apache.org/jira/browse/SSHD-1022) NPE in 
`SftpOutputStreamAsync#flush()` if no data written in between.
diff --git 
a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
 
b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
index b5b809c..b442ffe 100644
--- 
a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
+++ 
b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
@@ -33,6 +33,8 @@ import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.OutputStreamWithChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implements an output stream for a given remote file
@@ -40,6 +42,7 @@ import org.apache.sshd.common.util.io.OutputStreamWithChannel;
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
 public class SftpOutputStreamAsync extends OutputStreamWithChannel {
+    protected final Logger log;
     protected final byte[] bb = new byte[1];
     protected final int bufferSize;
     protected Buffer buffer;
@@ -47,12 +50,13 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel {
     protected long offset;
     protected final Deque<SftpAckData> pendingWrites = new LinkedList<>();
 
-    private final AbstractSftpClient client;
+    private final AbstractSftpClient clientInstance;
     private final String path;
 
     public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
                                  String path, Collection<OpenMode> mode) 
throws IOException {
-        this.client = Objects.requireNonNull(client, "No SFTP client 
instance");
+        this.log = LoggerFactory.getLogger(getClass());
+        this.clientInstance = Objects.requireNonNull(client, "No SFTP client 
instance");
         this.path = path;
         this.handle = client.open(path, mode);
         this.bufferSize = bufferSize;
@@ -60,7 +64,8 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel {
 
     public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
                                  String path, CloseableHandle handle) throws 
IOException {
-        this.client = Objects.requireNonNull(client, "No SFTP client 
instance");
+        this.log = LoggerFactory.getLogger(getClass());
+        this.clientInstance = Objects.requireNonNull(client, "No SFTP client 
instance");
         this.path = path;
         this.handle = handle;
         this.bufferSize = bufferSize;
@@ -72,7 +77,7 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel {
      * @return {@link SftpClient} instance used to access the remote file
      */
     public final AbstractSftpClient getClient() {
-        return client;
+        return clientInstance;
     }
 
     public void setOffset(long offset) {
@@ -102,23 +107,39 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel {
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
         byte[] id = handle.getIdentifier();
+        SftpClient client = getClient();
         Session session = client.getSession();
 
+        boolean traceEnabled = log.isTraceEnabled();
+        int writtenCount = 0;
+        int totalLen = len;
         do {
             if (buffer == null) {
+                if (traceEnabled) {
+                    log.trace("write({}) allocate buffer size={} after {}/{} 
bytes",
+                            this, bufferSize, writtenCount, totalLen);
+                }
+
                 buffer = 
session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
                 int hdr = 9 + 16 + 8 + id.length + buffer.wpos();
                 buffer.rpos(hdr);
                 buffer.wpos(hdr);
             }
+
             int max = bufferSize - (9 + 16 + id.length + 72);
             int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos()));
             buffer.putRawBytes(b, off, nb);
+
+            off += nb;
+            len -= nb;
+            writtenCount += nb;
+
             if (buffer.available() == max) {
+                if (traceEnabled) {
+                    log.trace("write({}) flush after {}/{} bytes", this, 
writtenCount, totalLen);
+                }
                 flush();
             }
-            off += nb;
-            len -= nb;
         } while (len > 0);
     }
 
@@ -128,19 +149,43 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel {
             throw new IOException("flush(" + getPath() + ") stream is closed");
         }
 
-        for (;;) {
+        boolean debugEnabled = log.isDebugEnabled();
+        AbstractSftpClient client = getClient();
+        for (int ackIndex = 0;;) {
             SftpAckData ack = pendingWrites.peek();
-            if (ack != null) {
-                Buffer response = client.receive(ack.id, 0L);
-                if (response != null) {
-                    pendingWrites.removeFirst();
-                    client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, 
response);
-                } else {
-                    break;
+            if (ack == null) {
+                if (debugEnabled) {
+                    log.debug("flush({}) processed {} pending writes", this, 
ackIndex);
                 }
-            } else {
                 break;
             }
+
+            ackIndex++;
+            if (debugEnabled) {
+                log.debug("flush({}) waiting for ack #{}: {}", this, ackIndex, 
ack);
+            }
+
+            Buffer response = client.receive(ack.id, 0L);
+            if (response == null) {
+                if (debugEnabled) {
+                    log.debug("flush({}) no response for ack #{}: {}", this, 
ackIndex, ack);
+                }
+                break;
+            }
+
+            if (debugEnabled) {
+                log.debug("flush({}) processing ack #{}: {}", this, ackIndex, 
ack);
+            }
+
+            ack = pendingWrites.removeFirst();
+            client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+        }
+
+        if (buffer == null) {
+            if (debugEnabled) {
+                log.debug("flush({}) no pending buffer to flush", this);
+            }
+            return;
         }
 
         byte[] id = handle.getIdentifier();
@@ -163,7 +208,11 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel {
         }
 
         int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
-        pendingWrites.add(new SftpAckData(reqId, offset, avail));
+        SftpAckData ack = new SftpAckData(reqId, offset, avail);
+        if (debugEnabled) {
+            log.debug("flush({}) enueue pending ack={}", this, ack);
+        }
+        pendingWrites.add(ack);
 
         offset += avail;
         buffer = null;
@@ -171,23 +220,51 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel {
 
     @Override
     public void close() throws IOException {
-        if (isOpen()) {
+        if (!isOpen()) {
+            return;
+        }
+
+        try {
+            boolean debugEnabled = log.isDebugEnabled();
+
             try {
-                try {
-                    if ((buffer != null) && (buffer.available() > 0)) {
-                        flush();
+                int pendingSize = (buffer == null) ? 0 : buffer.available();
+                if (pendingSize > 0) {
+                    if (debugEnabled) {
+                        log.debug("close({}) flushing {} pending bytes", this, 
pendingSize);
                     }
-                    while (!pendingWrites.isEmpty()) {
-                        SftpAckData ack = pendingWrites.removeFirst();
-                        Buffer response = client.receive(ack.id);
-                        
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+                    flush();
+                }
+
+                AbstractSftpClient client = getClient();
+                for (int ackIndex = 1; !pendingWrites.isEmpty(); ackIndex++) {
+                    SftpAckData ack = pendingWrites.removeFirst();
+                    if (debugEnabled) {
+                        log.debug("close({}) processing ack #{}: {}", this, 
ackIndex, ack);
                     }
-                } finally {
-                    handle.close();
+
+                    Buffer response = client.receive(ack.id);
+                    if (debugEnabled) {
+                        log.debug("close({}) processing ack #{} response for 
{}", this, ackIndex, ack);
+                    }
+                    client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, 
response);
                 }
             } finally {
-                handle = null;
+                if (debugEnabled) {
+                    log.debug("close({}) closing file handle", this);
+                }
+                handle.close();
             }
+        } finally {
+            handle = null;
         }
     }
+
+    @Override
+    public String toString() {
+        SftpClient client = getClient();
+        return getClass().getSimpleName()
+               + "[" + client.getSession() + "]"
+               + "[" + getPath() + "]";
+    }
 }
diff --git 
a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java 
b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
index e155ff1..35d6e5e 100644
--- 
a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
+++ 
b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
@@ -67,6 +67,7 @@ import 
org.apache.sshd.client.subsystem.sftp.extensions.BuiltinSftpClientExtensi
 import org.apache.sshd.client.subsystem.sftp.extensions.SftpClientExtension;
 import org.apache.sshd.client.subsystem.sftp.impl.AbstractSftpClient;
 import org.apache.sshd.client.subsystem.sftp.impl.DefaultCloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.impl.SftpOutputStreamAsync;
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.OptionalFeature;
@@ -1608,14 +1609,14 @@ public class SftpTest extends 
AbstractSftpClientTestSupport {
     protected void sendFile(String path, String data) throws Exception {
         ChannelSftp c = (ChannelSftp) 
session.openChannel(SftpConstants.SFTP_SUBSYSTEM_NAME);
         c.connect();
-        try {
-            c.put(new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)), path);
+        try (InputStream srcStream = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) {
+            c.put(srcStream, path);
         } finally {
             c.disconnect();
         }
     }
 
-    private String randomString(int size) {
+    private static String randomString(int size) {
         StringBuilder sb = new StringBuilder(size);
         for (int i = 0; i < size; i++) {
             sb.append((char) ((i % 10) + '0'));
@@ -1623,6 +1624,42 @@ public class SftpTest extends 
AbstractSftpClientTestSupport {
         return sb.toString();
     }
 
+    @Test   // see SSHD-1022
+    public void testFlushOutputStreamWithoutWrite() throws Exception {
+        Path targetPath = detectTargetFolder();
+        Path lclSftp = CommonTestSupportUtils.resolve(
+                targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME, 
getClass().getSimpleName(), getCurrentTestName());
+        CommonTestSupportUtils.deleteRecursive(lclSftp);
+
+        Path parentPath = targetPath.getParent();
+        Path clientFolder = 
assertHierarchyTargetFolderExists(lclSftp.resolve("client"));
+        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
+                .verify(CONNECT_TIMEOUT).getSession()) {
+            session.addPasswordIdentity(getCurrentTestName());
+            session.auth().verify(AUTH_TIMEOUT);
+
+            try (SftpClient sftp = createSftpClient(session)) {
+                Path file = clientFolder.resolve("file.txt");
+                String filePath = 
CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, file);
+                try (OutputStream os = sftp.write(filePath, 
SftpClient.MIN_WRITE_BUFFER_SIZE)) {
+                    
assertObjectInstanceOf(SftpOutputStreamAsync.class.getSimpleName(), 
SftpOutputStreamAsync.class, os);
+
+                    for (int index = 1; index <= 5; index++) {
+                        outputDebugMessage("%s - pre write flush attempt #%d", 
getCurrentTestName(), index);
+                        os.flush();
+                    }
+
+                    os.write((getCurrentTestName() + 
"\n").getBytes(StandardCharsets.UTF_8));
+
+                    for (int index = 1; index <= 5; index++) {
+                        outputDebugMessage("%s - post write flush attempt 
#%d", getCurrentTestName(), index);
+                        os.flush();
+                    }
+                }
+            }
+        }
+    }
+
     static class LinkData {
         private final Path source;
         private final Path target;

Reply via email to