This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git


The following commit(s) were added to refs/heads/master by this push:
     new 66feece  [SSHD-1022] Added logging to SftpInputStreamAsync
66feece is described below

commit 66feecec850beb532b4de75771ae2413c2cc5070
Author: Lyor Goldstein <lgoldst...@apache.org>
AuthorDate: Thu Jun 25 22:04:38 2020 +0300

    [SSHD-1022] Added logging to SftpInputStreamAsync
---
 .../subsystem/sftp/impl/SftpInputStreamAsync.java  | 189 +++++++++++++++------
 1 file changed, 137 insertions(+), 52 deletions(-)

diff --git 
a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
 
b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
index b012220..5eddbe9 100644
--- 
a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
+++ 
b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
@@ -40,8 +40,11 @@ import org.apache.sshd.common.subsystem.sftp.SftpHelper;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.InputStreamWithChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SftpInputStreamAsync extends InputStreamWithChannel {
+    protected final Logger log;
     protected final byte[] bb = new byte[1];
     protected final int bufferSize;
     protected final long fileSize;
@@ -52,12 +55,13 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
     protected final Deque<SftpAckData> pendingReads = new LinkedList<>();
     protected boolean eofIndicator;
 
-    private final AbstractSftpClient client;
+    private final AbstractSftpClient clientInstance;
     private final String path;
 
     public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
                                 String path, Collection<OpenMode> mode) throws 
IOException {
-        this.client = Objects.requireNonNull(client, "No SFTP client 
instance");
+        this.log = LoggerFactory.getLogger(getClass());
+        this.clientInstance = Objects.requireNonNull(client, "No SFTP client 
instance");
         this.path = path;
         this.handle = client.open(path, mode);
         this.bufferSize = bufferSize;
@@ -66,7 +70,8 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
 
     public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, 
long clientOffset, long fileSize,
                                 String path, CloseableHandle handle) {
-        this.client = Objects.requireNonNull(client, "No SFTP client 
instance");
+        this.log = LoggerFactory.getLogger(getClass());
+        this.clientInstance = Objects.requireNonNull(client, "No SFTP client 
instance");
         this.path = path;
         this.handle = handle;
         this.bufferSize = bufferSize;
@@ -80,7 +85,7 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
      * @return {@link SftpClient} instance used to access the remote file
      */
     public final AbstractSftpClient getClient() {
-        return client;
+        return clientInstance;
     }
 
     /**
@@ -120,8 +125,9 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
         if (!isOpen()) {
             throw new IOException("read(" + getPath() + ") stream closed");
         }
+
         int idx = off;
-        while (len > 0 && !eofIndicator) {
+        while ((len > 0) && (!eofIndicator)) {
             if (hasNoData()) {
                 fillData();
                 if (eofIndicator && (hasNoData())) {
@@ -136,8 +142,9 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
                 clientOffset += nb;
             }
         }
+
         int res = off - idx;
-        if (res == 0 && eofIndicator) {
+        if ((res == 0) && eofIndicator) {
             res = -1;
         }
         return res;
@@ -149,6 +156,7 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
         }
 
         long orgOffset = clientOffset;
+        long totalRequested = max;
         while ((!eofIndicator) && (max > 0L)) {
             if (hasNoData()) {
                 fillData();
@@ -168,7 +176,12 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
                 max -= toRead;
             }
         }
-        return clientOffset - orgOffset;
+
+        long numXfered = clientOffset - orgOffset;
+        if (log.isDebugEnabled()) {
+            log.debug("transferTo({}) transferred {}/{} bytes", numXfered, 
totalRequested);
+        }
+        return numXfered;
     }
 
     @SuppressWarnings("PMD.MissingOverride")
@@ -192,7 +205,12 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
                 clientOffset += nb;
             }
         }
-        return clientOffset - orgOffset;
+
+        long numXfered = clientOffset - orgOffset;
+        if (log.isDebugEnabled()) {
+            log.debug("transferTo({}) transferred {} bytes", this, numXfered);
+        }
+        return numXfered;
     }
 
     @Override
@@ -200,10 +218,15 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
         if (!isOpen()) {
             throw new IOException("skip(" + getPath() + ") stream closed");
         }
+
         if ((clientOffset == 0L) && pendingReads.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("skip({}) virtual skip of {} bytes", this, n);
+            }
             clientOffset = n;
             return n;
         }
+
         return super.skip(n);
     }
 
@@ -212,64 +235,109 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
     }
 
     protected void sendRequests() throws IOException {
-        if (!eofIndicator) {
-            Channel channel = client.getChannel();
-            Window localWindow = channel.getLocalWindow();
-            long windowSize = localWindow.getMaxSize();
-            Session session = client.getSession();
-            byte[] id = handle.getIdentifier();
-
-            while ((pendingReads.size() < (int) (windowSize / bufferSize)) && 
(requestOffset < (fileSize + bufferSize))
-                    || pendingReads.isEmpty()) {
-                Buffer buf = 
session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
-                        23 /* sftp packet */ + 16 + id.length);
-                buf.rpos(23);
-                buf.wpos(23);
-                buf.putBytes(id);
-                buf.putLong(requestOffset);
-                buf.putInt(bufferSize);
-                int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
-                pendingReads.add(new SftpAckData(reqId, requestOffset, 
bufferSize));
-                requestOffset += bufferSize;
+        if (eofIndicator) {
+            if (log.isDebugEnabled()) {
+                log.debug("sendRequests({}) EOF indicator ON", this);
             }
+            return;
+        }
+
+        AbstractSftpClient client = getClient();
+        Channel channel = client.getChannel();
+        Window localWindow = channel.getLocalWindow();
+        long windowSize = localWindow.getMaxSize();
+        Session session = client.getSession();
+        byte[] id = handle.getIdentifier();
+        boolean traceEnabled = log.isTraceEnabled();
+        for (int ackIndex = 1;
+             (pendingReads.size() < (int) (windowSize / bufferSize)) && 
(requestOffset < (fileSize + bufferSize))
+                     || pendingReads.isEmpty();
+             ackIndex++) {
+            Buffer buf = 
session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+                    23 /* sftp packet */ + 16 + id.length);
+            buf.rpos(23);
+            buf.wpos(23);
+            buf.putBytes(id);
+            buf.putLong(requestOffset);
+            buf.putInt(bufferSize);
+            int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
+            SftpAckData ack = new SftpAckData(reqId, requestOffset, 
bufferSize);
+            if (traceEnabled) {
+                log.trace("sendRequests({}) enqueue pending ack #{}: {}", 
this, ackIndex, ack);
+            }
+            pendingReads.add(ack);
+            requestOffset += bufferSize;
         }
     }
 
     protected void fillData() throws IOException {
         SftpAckData ack = pendingReads.pollFirst();
-        if (ack != null) {
-            pollBuffer(ack);
-            if ((!eofIndicator) && (clientOffset < ack.offset)) {
-                // we are actually missing some data
-                // so request is synchronously
-                byte[] data = new byte[(int) (ack.offset - clientOffset + 
buffer.available())];
-                int cur = 0;
-                int nb = (int) (ack.offset - clientOffset);
-                AtomicReference<Boolean> eof = new AtomicReference<>();
-                while (cur < nb) {
-                    int dlen = client.read(handle, clientOffset, data, cur, nb 
- cur, eof);
-                    Boolean eofSignal = eof.getAndSet(null);
-                    eofIndicator = (dlen < 0) || ((eofSignal != null) && 
eofSignal.booleanValue());
-                    cur += dlen;
+        boolean traceEnabled = log.isTraceEnabled();
+        if (ack == null) {
+            if (traceEnabled) {
+                log.trace("fillData({}) no pending ack", this);
+            }
+            return;
+        }
+
+        if (traceEnabled) {
+            log.trace("fillData({}) process ack={}", this, ack);
+        }
+        pollBuffer(ack);
+
+        if ((!eofIndicator) && (clientOffset < ack.offset)) {
+            // we are actually missing some data
+            // so request is synchronously
+            byte[] data = new byte[(int) (ack.offset - clientOffset + 
buffer.available())];
+            int nb = (int) (ack.offset - clientOffset);
+            if (traceEnabled) {
+                log.trace("fillData({}) reading {} bytes", this, nb);
+            }
+
+            AtomicReference<Boolean> eof = new AtomicReference<>();
+            SftpClient client = getClient();
+            for (int cur = 0; cur < nb;) {
+                int dlen = client.read(handle, clientOffset, data, cur, nb - 
cur, eof);
+                Boolean eofSignal = eof.getAndSet(null);
+                if ((dlen < 0) || ((eofSignal != null) && 
eofSignal.booleanValue())) {
+                    eofIndicator = true;
                 }
-                buffer.getRawBytes(data, nb, buffer.available());
-                buffer = new ByteArrayBuffer(data);
+                cur += dlen;
+            }
+
+            if (traceEnabled) {
+                log.trace("fillData({}) read {} bytes - EOF={}", this, nb, 
eofIndicator);
             }
+
+            buffer.getRawBytes(data, nb, buffer.available());
+            buffer = new ByteArrayBuffer(data);
         }
     }
 
     protected void pollBuffer(SftpAckData ack) throws IOException {
+        boolean traceEnabled = log.isTraceEnabled();
+        if (traceEnabled) {
+            log.trace("pollBuffer({}) polling ack={}", this, ack);
+        }
+
+        AbstractSftpClient client = getClient();
         Buffer buf = client.receive(ack.id);
         int length = buf.getInt();
         int type = buf.getUByte();
         int id = buf.getInt();
+        if (traceEnabled) {
+            log.trace("pollBuffer({}) response={} for ack={} - len={}", this, 
type, ack, length);
+        }
         client.validateIncomingResponse(SshConstants.SSH_MSG_CHANNEL_DATA, id, 
type, length, buf);
+
         if (type == SftpConstants.SSH_FXP_DATA) {
             int dlen = buf.getInt();
             int rpos = buf.rpos();
             buf.rpos(rpos + dlen);
             Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, 
client.getVersion());
-            eofIndicator = (b != null) && b.booleanValue();
+            if ((b != null) && b.booleanValue()) {
+                eofIndicator = true;
+            }
             buf.rpos(rpos);
             buf.wpos(rpos + dlen);
             this.buffer = buf;
@@ -293,19 +361,36 @@ public class SftpInputStreamAsync extends 
InputStreamWithChannel {
 
     @Override
     public void close() throws IOException {
-        if (isOpen()) {
+        if (!isOpen()) {
+            return;
+        }
+
+        try {
+            boolean debugEnabled = log.isDebugEnabled();
             try {
-                try {
-                    while (!pendingReads.isEmpty()) {
-                        SftpAckData ack = pendingReads.removeFirst();
-                        pollBuffer(ack);
+                for (int ackIndex = 1; !pendingReads.isEmpty(); ackIndex++) {
+                    SftpAckData ack = pendingReads.removeFirst();
+                    if (debugEnabled) {
+                        log.debug("close({}) process ack #{}: {}", this, 
ackIndex, ack);
                     }
-                } finally {
-                    handle.close();
+                    pollBuffer(ack);
                 }
             } finally {
-                handle = null;
+                if (debugEnabled) {
+                    log.debug("close({}) closing file handle", this);
+                }
+                handle.close();
             }
+        } finally {
+            handle = null;
         }
     }
+
+    @Override
+    public String toString() {
+        SftpClient client = getClient();
+        return getClass().getSimpleName()
+               + "[" + client.getSession() + "]"
+               + "[" + getPath() + "]";
+    }
 }

Reply via email to