This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push: new 66feece [SSHD-1022] Added logging to SftpInputStreamAsync 66feece is described below commit 66feecec850beb532b4de75771ae2413c2cc5070 Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Thu Jun 25 22:04:38 2020 +0300 [SSHD-1022] Added logging to SftpInputStreamAsync --- .../subsystem/sftp/impl/SftpInputStreamAsync.java | 189 +++++++++++++++------ 1 file changed, 137 insertions(+), 52 deletions(-) diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java index b012220..5eddbe9 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java @@ -40,8 +40,11 @@ import org.apache.sshd.common.subsystem.sftp.SftpHelper; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.io.InputStreamWithChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SftpInputStreamAsync extends InputStreamWithChannel { + protected final Logger log; protected final byte[] bb = new byte[1]; protected final int bufferSize; protected final long fileSize; @@ -52,12 +55,13 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { protected final Deque<SftpAckData> pendingReads = new LinkedList<>(); protected boolean eofIndicator; - private final AbstractSftpClient client; + private final AbstractSftpClient clientInstance; private final String path; public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, String path, Collection<OpenMode> mode) throws IOException { - this.client = Objects.requireNonNull(client, "No SFTP client instance"); + this.log = LoggerFactory.getLogger(getClass()); + this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance"); this.path = path; this.handle = client.open(path, mode); this.bufferSize = bufferSize; @@ -66,7 +70,8 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, long clientOffset, long fileSize, String path, CloseableHandle handle) { - this.client = Objects.requireNonNull(client, "No SFTP client instance"); + this.log = LoggerFactory.getLogger(getClass()); + this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance"); this.path = path; this.handle = handle; this.bufferSize = bufferSize; @@ -80,7 +85,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { * @return {@link SftpClient} instance used to access the remote file */ public final AbstractSftpClient getClient() { - return client; + return clientInstance; } /** @@ -120,8 +125,9 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { if (!isOpen()) { throw new IOException("read(" + getPath() + ") stream closed"); } + int idx = off; - while (len > 0 && !eofIndicator) { + while ((len > 0) && (!eofIndicator)) { if (hasNoData()) { fillData(); if (eofIndicator && (hasNoData())) { @@ -136,8 +142,9 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { clientOffset += nb; } } + int res = off - idx; - if (res == 0 && eofIndicator) { + if ((res == 0) && eofIndicator) { res = -1; } return res; @@ -149,6 +156,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { } long orgOffset = clientOffset; + long totalRequested = max; while ((!eofIndicator) && (max > 0L)) { if (hasNoData()) { fillData(); @@ -168,7 +176,12 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { max -= toRead; } } - return clientOffset - orgOffset; + + long numXfered = clientOffset - orgOffset; + if (log.isDebugEnabled()) { + log.debug("transferTo({}) transferred {}/{} bytes", numXfered, totalRequested); + } + return numXfered; } @SuppressWarnings("PMD.MissingOverride") @@ -192,7 +205,12 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { clientOffset += nb; } } - return clientOffset - orgOffset; + + long numXfered = clientOffset - orgOffset; + if (log.isDebugEnabled()) { + log.debug("transferTo({}) transferred {} bytes", this, numXfered); + } + return numXfered; } @Override @@ -200,10 +218,15 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { if (!isOpen()) { throw new IOException("skip(" + getPath() + ") stream closed"); } + if ((clientOffset == 0L) && pendingReads.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("skip({}) virtual skip of {} bytes", this, n); + } clientOffset = n; return n; } + return super.skip(n); } @@ -212,64 +235,109 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { } protected void sendRequests() throws IOException { - if (!eofIndicator) { - Channel channel = client.getChannel(); - Window localWindow = channel.getLocalWindow(); - long windowSize = localWindow.getMaxSize(); - Session session = client.getSession(); - byte[] id = handle.getIdentifier(); - - while ((pendingReads.size() < (int) (windowSize / bufferSize)) && (requestOffset < (fileSize + bufferSize)) - || pendingReads.isEmpty()) { - Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, - 23 /* sftp packet */ + 16 + id.length); - buf.rpos(23); - buf.wpos(23); - buf.putBytes(id); - buf.putLong(requestOffset); - buf.putInt(bufferSize); - int reqId = client.send(SftpConstants.SSH_FXP_READ, buf); - pendingReads.add(new SftpAckData(reqId, requestOffset, bufferSize)); - requestOffset += bufferSize; + if (eofIndicator) { + if (log.isDebugEnabled()) { + log.debug("sendRequests({}) EOF indicator ON", this); } + return; + } + + AbstractSftpClient client = getClient(); + Channel channel = client.getChannel(); + Window localWindow = channel.getLocalWindow(); + long windowSize = localWindow.getMaxSize(); + Session session = client.getSession(); + byte[] id = handle.getIdentifier(); + boolean traceEnabled = log.isTraceEnabled(); + for (int ackIndex = 1; + (pendingReads.size() < (int) (windowSize / bufferSize)) && (requestOffset < (fileSize + bufferSize)) + || pendingReads.isEmpty(); + ackIndex++) { + Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, + 23 /* sftp packet */ + 16 + id.length); + buf.rpos(23); + buf.wpos(23); + buf.putBytes(id); + buf.putLong(requestOffset); + buf.putInt(bufferSize); + int reqId = client.send(SftpConstants.SSH_FXP_READ, buf); + SftpAckData ack = new SftpAckData(reqId, requestOffset, bufferSize); + if (traceEnabled) { + log.trace("sendRequests({}) enqueue pending ack #{}: {}", this, ackIndex, ack); + } + pendingReads.add(ack); + requestOffset += bufferSize; } } protected void fillData() throws IOException { SftpAckData ack = pendingReads.pollFirst(); - if (ack != null) { - pollBuffer(ack); - if ((!eofIndicator) && (clientOffset < ack.offset)) { - // we are actually missing some data - // so request is synchronously - byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())]; - int cur = 0; - int nb = (int) (ack.offset - clientOffset); - AtomicReference<Boolean> eof = new AtomicReference<>(); - while (cur < nb) { - int dlen = client.read(handle, clientOffset, data, cur, nb - cur, eof); - Boolean eofSignal = eof.getAndSet(null); - eofIndicator = (dlen < 0) || ((eofSignal != null) && eofSignal.booleanValue()); - cur += dlen; + boolean traceEnabled = log.isTraceEnabled(); + if (ack == null) { + if (traceEnabled) { + log.trace("fillData({}) no pending ack", this); + } + return; + } + + if (traceEnabled) { + log.trace("fillData({}) process ack={}", this, ack); + } + pollBuffer(ack); + + if ((!eofIndicator) && (clientOffset < ack.offset)) { + // we are actually missing some data + // so request is synchronously + byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())]; + int nb = (int) (ack.offset - clientOffset); + if (traceEnabled) { + log.trace("fillData({}) reading {} bytes", this, nb); + } + + AtomicReference<Boolean> eof = new AtomicReference<>(); + SftpClient client = getClient(); + for (int cur = 0; cur < nb;) { + int dlen = client.read(handle, clientOffset, data, cur, nb - cur, eof); + Boolean eofSignal = eof.getAndSet(null); + if ((dlen < 0) || ((eofSignal != null) && eofSignal.booleanValue())) { + eofIndicator = true; } - buffer.getRawBytes(data, nb, buffer.available()); - buffer = new ByteArrayBuffer(data); + cur += dlen; + } + + if (traceEnabled) { + log.trace("fillData({}) read {} bytes - EOF={}", this, nb, eofIndicator); } + + buffer.getRawBytes(data, nb, buffer.available()); + buffer = new ByteArrayBuffer(data); } } protected void pollBuffer(SftpAckData ack) throws IOException { + boolean traceEnabled = log.isTraceEnabled(); + if (traceEnabled) { + log.trace("pollBuffer({}) polling ack={}", this, ack); + } + + AbstractSftpClient client = getClient(); Buffer buf = client.receive(ack.id); int length = buf.getInt(); int type = buf.getUByte(); int id = buf.getInt(); + if (traceEnabled) { + log.trace("pollBuffer({}) response={} for ack={} - len={}", this, type, ack, length); + } client.validateIncomingResponse(SshConstants.SSH_MSG_CHANNEL_DATA, id, type, length, buf); + if (type == SftpConstants.SSH_FXP_DATA) { int dlen = buf.getInt(); int rpos = buf.rpos(); buf.rpos(rpos + dlen); Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion()); - eofIndicator = (b != null) && b.booleanValue(); + if ((b != null) && b.booleanValue()) { + eofIndicator = true; + } buf.rpos(rpos); buf.wpos(rpos + dlen); this.buffer = buf; @@ -293,19 +361,36 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { @Override public void close() throws IOException { - if (isOpen()) { + if (!isOpen()) { + return; + } + + try { + boolean debugEnabled = log.isDebugEnabled(); try { - try { - while (!pendingReads.isEmpty()) { - SftpAckData ack = pendingReads.removeFirst(); - pollBuffer(ack); + for (int ackIndex = 1; !pendingReads.isEmpty(); ackIndex++) { + SftpAckData ack = pendingReads.removeFirst(); + if (debugEnabled) { + log.debug("close({}) process ack #{}: {}", this, ackIndex, ack); } - } finally { - handle.close(); + pollBuffer(ack); } } finally { - handle = null; + if (debugEnabled) { + log.debug("close({}) closing file handle", this); + } + handle.close(); } + } finally { + handle = null; } } + + @Override + public String toString() { + SftpClient client = getClient(); + return getClass().getSimpleName() + + "[" + client.getSession() + "]" + + "[" + getPath() + "]"; + } }