This is an automated email from the ASF dual-hosted git repository. twolf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit c2a476094e2119971aa07be457e8a0aa1c0a3ec6 Author: Thomas Wolf <tw...@apache.org> AuthorDate: Tue Aug 6 20:15:51 2024 +0200 GH-524: Avoid unnecessary work in SFTP uploads Streamline ACK checking a bit. Do a quick check for the OK case first and do the full and proper parsing only if the status is not OK. Also avoid getting the IDLE_TIMEOUT repeatedly when draining remaining ACKs when SftpOutputStreamAsync is closed. --- .../sftp/client/impl/SftpOutputStreamAsync.java | 39 +++++++++++++++++++--- .../apache/sshd/sftp/client/impl/SftpResponse.java | 3 +- .../apache/sshd/sftp/client/impl/SftpStatus.java | 8 ++++- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java index bd9200083..c35c05d6e 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java @@ -32,16 +32,19 @@ import java.util.Objects; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.session.helpers.PacketBuffer; +import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.io.output.OutputStreamWithChannel; +import org.apache.sshd.core.CoreModuleProperties; import org.apache.sshd.sftp.client.SftpClient; import org.apache.sshd.sftp.client.SftpClient.CloseableHandle; import org.apache.sshd.sftp.client.SftpClient.OpenMode; import org.apache.sshd.sftp.client.SftpClientHolder; import org.apache.sshd.sftp.client.SftpMessage; import org.apache.sshd.sftp.common.SftpConstants; +import org.apache.sshd.sftp.common.SftpException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -327,8 +330,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel implements Sf } pendingAcks.removeFirst(); - SftpResponse response = SftpResponse.parse(SftpConstants.SSH_FXP_WRITE, buf); - client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, ack.id, SftpStatus.parse(response)); + checkStatus(client, buf); } if (buffer == null) { @@ -366,6 +368,27 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel implements Sf buffer = null; } + private void checkStatus(AbstractSftpClient client, Buffer buf) throws IOException { + if (buf.available() >= 13) { + int rpos = buf.rpos(); + buf.rpos(rpos + 4); // Skip length + int cmd = buf.getUByte(); + if (cmd != SftpConstants.SSH_FXP_STATUS) { + throw new SftpException(SftpConstants.SSH_FX_BAD_MESSAGE, + "Unexpected SFTP response; expected SSH_FXP_STATUS but got " + + SftpConstants.getCommandMessageName(cmd)); + } + buf.rpos(rpos + 9); // Skip ahead until after the id + if (buf.getInt() == SftpConstants.SSH_FX_OK) { + return; + } + // Reset and do the full parse + buf.rpos(rpos); + } + SftpResponse response = SftpResponse.parse(SftpConstants.SSH_FXP_WRITE, buf); + client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response.getId(), SftpStatus.parse(response)); + } + @Override public void close() throws IOException { if (!isOpen()) { @@ -388,6 +411,10 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel implements Sf lastMsg = null; } + Duration idleTimeout = CoreModuleProperties.IDLE_TIMEOUT.getRequired(getClient().getClientSession()); + if (GenericUtils.isNegativeOrNull(idleTimeout)) { + idleTimeout = CoreModuleProperties.IDLE_TIMEOUT.getRequiredDefault(); + } AbstractSftpClient client = getClient(); for (int ackIndex = 1; !pendingAcks.isEmpty(); ackIndex++) { SftpAckData ack = pendingAcks.removeFirst(); @@ -395,11 +422,15 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel implements Sf log.debug("close({}) processing ack #{}: {}", this, ackIndex, ack); } - SftpResponse response = client.response(SftpConstants.SSH_FXP_WRITE, ack.id); + Buffer buf = client.receive(ack.id, idleTimeout); + if (buf == null) { + log.debug("close({}) no ack response for {}", this, ack); + break; + } if (debugEnabled) { log.debug("close({}) processing ack #{} response for {}", this, ackIndex, ack); } - client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response.getId(), SftpStatus.parse(response)); + checkStatus(client, buf); } } finally { if (ownsHandle) { diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java index fd8787d9f..3ce0839fc 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java @@ -71,7 +71,8 @@ public final class SftpResponse { int length = buffer.getInt(); int type = buffer.getUByte(); int id = buffer.getInt(); - validateIncomingResponse(cmd, id, type, length, buffer); + // No need to validate the length here: the way we assemble these buffers guarantees that + // the length is reasonable and does not exceed buffer.available(). return new SftpResponse(cmd, id, type, length, buffer); } diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java index 5d1511fcf..93fea737d 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java @@ -20,6 +20,7 @@ package org.apache.sshd.sftp.client.impl; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.sftp.common.SftpConstants; +import org.apache.sshd.sftp.common.SftpException; /** * A representation of a SSH_FXP_STATUS record. @@ -69,7 +70,12 @@ public final class SftpStatus { return new SftpStatus(code, message, language); } - public static SftpStatus parse(SftpResponse response) { + public static SftpStatus parse(SftpResponse response) throws SftpException { + if (response.getType() != SftpConstants.SSH_FXP_STATUS) { + throw new SftpException( + SftpConstants.SSH_FX_BAD_MESSAGE, "Unexpected SFTP response: expected SSH_FXP_STATUS but got " + + SftpConstants.getCommandMessageName(response.getType())); + } return parse(response.getBuffer()); } }