This is an automated email from the ASF dual-hosted git repository.

twolf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit c2a476094e2119971aa07be457e8a0aa1c0a3ec6
Author: Thomas Wolf <tw...@apache.org>
AuthorDate: Tue Aug 6 20:15:51 2024 +0200

    GH-524: Avoid unnecessary work in SFTP uploads
    
    Streamline ACK checking a bit. Do a quick check for the OK case first
    and do the full and proper parsing only if the status is not OK. Also
    avoid getting the IDLE_TIMEOUT repeatedly when draining remaining ACKs
    when SftpOutputStreamAsync is closed.
---
 .../sftp/client/impl/SftpOutputStreamAsync.java    | 39 +++++++++++++++++++---
 .../apache/sshd/sftp/client/impl/SftpResponse.java |  3 +-
 .../apache/sshd/sftp/client/impl/SftpStatus.java   |  8 ++++-
 3 files changed, 44 insertions(+), 6 deletions(-)

diff --git 
a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
 
b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
index bd9200083..c35c05d6e 100644
--- 
a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
+++ 
b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
@@ -32,16 +32,19 @@ import java.util.Objects;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.helpers.PacketBuffer;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.output.OutputStreamWithChannel;
+import org.apache.sshd.core.CoreModuleProperties;
 import org.apache.sshd.sftp.client.SftpClient;
 import org.apache.sshd.sftp.client.SftpClient.CloseableHandle;
 import org.apache.sshd.sftp.client.SftpClient.OpenMode;
 import org.apache.sshd.sftp.client.SftpClientHolder;
 import org.apache.sshd.sftp.client.SftpMessage;
 import org.apache.sshd.sftp.common.SftpConstants;
+import org.apache.sshd.sftp.common.SftpException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -327,8 +330,7 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel implements Sf
             }
 
             pendingAcks.removeFirst();
-            SftpResponse response = 
SftpResponse.parse(SftpConstants.SSH_FXP_WRITE, buf);
-            client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, ack.id, 
SftpStatus.parse(response));
+            checkStatus(client, buf);
         }
 
         if (buffer == null) {
@@ -366,6 +368,27 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel implements Sf
         buffer = null;
     }
 
+    private void checkStatus(AbstractSftpClient client, Buffer buf) throws 
IOException {
+        if (buf.available() >= 13) {
+            int rpos = buf.rpos();
+            buf.rpos(rpos + 4); // Skip length
+            int cmd = buf.getUByte();
+            if (cmd != SftpConstants.SSH_FXP_STATUS) {
+                throw new SftpException(SftpConstants.SSH_FX_BAD_MESSAGE,
+                        "Unexpected SFTP response; expected SSH_FXP_STATUS but 
got "
+                                                                          + 
SftpConstants.getCommandMessageName(cmd));
+            }
+            buf.rpos(rpos + 9); // Skip ahead until after the id
+            if (buf.getInt() == SftpConstants.SSH_FX_OK) {
+                return;
+            }
+            // Reset and do the full parse
+            buf.rpos(rpos);
+        }
+        SftpResponse response = 
SftpResponse.parse(SftpConstants.SSH_FXP_WRITE, buf);
+        client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, 
response.getId(), SftpStatus.parse(response));
+    }
+
     @Override
     public void close() throws IOException {
         if (!isOpen()) {
@@ -388,6 +411,10 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel implements Sf
                     lastMsg = null;
                 }
 
+                Duration idleTimeout = 
CoreModuleProperties.IDLE_TIMEOUT.getRequired(getClient().getClientSession());
+                if (GenericUtils.isNegativeOrNull(idleTimeout)) {
+                    idleTimeout = 
CoreModuleProperties.IDLE_TIMEOUT.getRequiredDefault();
+                }
                 AbstractSftpClient client = getClient();
                 for (int ackIndex = 1; !pendingAcks.isEmpty(); ackIndex++) {
                     SftpAckData ack = pendingAcks.removeFirst();
@@ -395,11 +422,15 @@ public class SftpOutputStreamAsync extends 
OutputStreamWithChannel implements Sf
                         log.debug("close({}) processing ack #{}: {}", this, 
ackIndex, ack);
                     }
 
-                    SftpResponse response = 
client.response(SftpConstants.SSH_FXP_WRITE, ack.id);
+                    Buffer buf = client.receive(ack.id, idleTimeout);
+                    if (buf == null) {
+                        log.debug("close({}) no ack response for {}", this, 
ack);
+                        break;
+                    }
                     if (debugEnabled) {
                         log.debug("close({}) processing ack #{} response for 
{}", this, ackIndex, ack);
                     }
-                    client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, 
response.getId(), SftpStatus.parse(response));
+                    checkStatus(client, buf);
                 }
             } finally {
                 if (ownsHandle) {
diff --git 
a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java 
b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java
index fd8787d9f..3ce0839fc 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java
@@ -71,7 +71,8 @@ public final class SftpResponse {
         int length = buffer.getInt();
         int type = buffer.getUByte();
         int id = buffer.getInt();
-        validateIncomingResponse(cmd, id, type, length, buffer);
+        // No need to validate the length here: the way we assemble these 
buffers guarantees that
+        // the length is reasonable and does not exceed buffer.available().
         return new SftpResponse(cmd, id, type, length, buffer);
     }
 
diff --git 
a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java 
b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java
index 5d1511fcf..93fea737d 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java
@@ -20,6 +20,7 @@ package org.apache.sshd.sftp.client.impl;
 
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.sftp.common.SftpConstants;
+import org.apache.sshd.sftp.common.SftpException;
 
 /**
  * A representation of a SSH_FXP_STATUS record.
@@ -69,7 +70,12 @@ public final class SftpStatus {
         return new SftpStatus(code, message, language);
     }
 
-    public static SftpStatus parse(SftpResponse response) {
+    public static SftpStatus parse(SftpResponse response) throws SftpException 
{
+        if (response.getType() != SftpConstants.SSH_FXP_STATUS) {
+            throw new SftpException(
+                    SftpConstants.SSH_FX_BAD_MESSAGE, "Unexpected SFTP 
response: expected SSH_FXP_STATUS but got "
+                                                      + 
SftpConstants.getCommandMessageName(response.getType()));
+        }
         return parse(response.getBuffer());
     }
 }

Reply via email to