This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 15d6c73538e0477d4a57783b4ff5aebdbad405c0 Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Thu Apr 23 21:10:27 2020 +0300 [SSHD-979] Allow more flexible extension of improved SFTP API implementations --- .../sshd/client/subsystem/sftp/RawSftpClient.java | 1 - .../subsystem/sftp/impl/AbstractSftpClient.java | 34 +---- .../subsystem/sftp/impl/DefaultSftpClient.java | 163 +++++++++++++-------- .../client/subsystem/sftp/impl/SftpAckData.java | 45 ++++++ .../subsystem/sftp/impl/SftpInputStreamAsync.java | 78 +++++----- .../subsystem/sftp/impl/SftpOutputStreamAsync.java | 46 +++--- .../subsystem/sftp/impl/SftpRemotePathChannel.java | 24 +-- .../sshd/server/subsystem/sftp/FileHandle.java | 1 + 8 files changed, 214 insertions(+), 178 deletions(-) diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java index 560ce55..0f1afd7 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java @@ -49,5 +49,4 @@ public interface RawSftpClient { * @throws IOException If connection closed or interrupted */ Buffer receive(int id, long timeout) throws IOException; - } diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java index 211f3ee..338f666 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java @@ -58,6 +58,8 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public abstract class AbstractSftpClient extends AbstractSubsystemClient implements SftpClient, RawSftpClient { + public static final int INIT_COMMAND_SIZE = Byte.BYTES /* command */ + Integer.BYTES /* version */; + /** * Property used to avoid large buffers when {@link #write(Handle, long, byte[], int, int)} is invoked with a large * buffer size. @@ -158,7 +160,7 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme /** * Sends the specified command, waits for the response and then invokes {@link #checkResponseStatus(int, Buffer)} - * + * * @param cmd The command to send * @param request The request {@link Buffer} * @throws IOException If failed to send, receive or check the returned status @@ -872,7 +874,7 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme @Override public void write(Handle handle, long fileOffset, byte[] src, int srcOffset, int len) throws IOException { // do some bounds checking first - if ((fileOffset < 0) || (srcOffset < 0) || (len < 0)) { + if ((fileOffset < 0L) || (srcOffset < 0) || (len < 0)) { throw new IllegalArgumentException( "write(" + handle + ") please ensure all parameters " + " are non-negative values: file-offset=" + fileOffset @@ -1281,13 +1283,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer); } - /** - * @param path The remote directory path - * @return An {@link Iterable} that can be used to iterate over all the directory entries (unlike - * {@link #readDir(Handle)}) - * @throws IOException If failed to access the remote site - * @see #readDir(Handle) - */ @Override public Iterable<DirEntry> readDir(String path) throws IOException { if (!isOpen()) { @@ -1297,13 +1292,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme return new SftpIterableDirEntry(this, path); } - /** - * @param handle A directory {@link Handle} - * @return An {@link Iterable} that can be used to iterate over all the directory entries (like - * {@link #readDir(String)}). <B>Note:</B> the iterable instance is not re-usable - i.e., files - * can be iterated only <U>once</U> - * @throws IOException If failed to access the directory - */ @Override public Iterable<DirEntry> listDir(Handle handle) throws IOException { if (!isOpen()) { @@ -1313,20 +1301,8 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme return new StfpIterableDirHandle(this, handle); } - /** - * Opens an {@link FileChannel} on the specified remote path - * - * @param path The remote path - * @param modes The access mode(s) - if {@code null}/empty then the {@link #DEFAULT_CHANNEL_MODES} are used - * @return The open {@link FileChannel} - <B>Note:</B> do not close this owner client instance until the - * channel is no longer needed since it uses the client for providing the channel's - * functionality. - * @throws IOException If failed to open the channel - * @see java.nio.channels.Channels#newInputStream(java.nio.channels.ReadableByteChannel) - * @see java.nio.channels.Channels#newOutputStream(java.nio.channels.WritableByteChannel) - */ @Override - public SftpRemotePathChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException { + public FileChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException { return new SftpRemotePathChannel(path, this, false, GenericUtils.isEmpty(modes) ? DEFAULT_CHANNEL_MODES : modes); } diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java index 2c13f5b..cf00605 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java @@ -50,6 +50,8 @@ import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.io.IoOutputStream; +import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.subsystem.sftp.SftpConstants; @@ -79,7 +81,7 @@ public class DefaultSftpClient extends AbstractSftpClient { this.nameDecodingCharset = PropertyResolverUtils.getCharset( clientSession, NAME_DECODING_CHARSET, DEFAULT_NAME_DECODING_CHARSET); this.clientSession = Objects.requireNonNull(clientSession, "No client session"); - this.channel = new SftpChannelSubsystem(); + this.channel = createSftpChannelSubsystem(clientSession); clientSession.getService(ConnectionService.class).registerChannel(channel); long initializationTimeout = clientSession.getLongProperty( @@ -153,7 +155,7 @@ public class DefaultSftpClient extends AbstractSftpClient { /** * Receive binary data - * + * * @param buf The buffer for the incoming data * @param start Offset in buffer to place the data * @param len Available space in buffer for the data @@ -278,7 +280,10 @@ public class DefaultSftpClient extends AbstractSftpClient { buf.putInt(id); buf.putBuffer(buffer); } - channel.getAsyncIn().writePacket(buf).verify(); + + IoOutputStream asyncIn = channel.getAsyncIn(); + IoWriteFuture writeFuture = asyncIn.writePacket(buf); + writeFuture.verify(); return id; } @@ -315,7 +320,7 @@ public class DefaultSftpClient extends AbstractSftpClient { if (buffer != null) { return buffer; } - if (idleTimeout > 0) { + if (idleTimeout > 0L) { try { messages.wait(idleTimeout); } catch (InterruptedException e) { @@ -327,65 +332,37 @@ public class DefaultSftpClient extends AbstractSftpClient { } protected void init(long initializationTimeout) throws IOException { - ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout); - // Send init packet - Buffer buf = new ByteArrayBuffer(9); - buf.putInt(5); + Buffer buf = new ByteArrayBuffer(INIT_COMMAND_SIZE + SshConstants.SSH_PACKET_HEADER_LEN); + buf.putInt(INIT_COMMAND_SIZE); buf.putByte((byte) SftpConstants.SSH_FXP_INIT); buf.putInt(SftpConstants.SFTP_V6); - channel.getAsyncIn().writePacket(buf).verify(); - - Buffer buffer; - Integer reqId; - synchronized (messages) { - /* - * We need to use a timeout since if the remote server does not support SFTP, we will not know it - * immediately. This is due to the fact that the request for the subsystem does not contain a reply as to - * its success or failure. Thus, the SFTP channel is created by the client, but there is no one on the other - * side to reply - thus the need for the timeout - */ - for (long remainingTimeout = initializationTimeout; - (remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) { - try { - long sleepStart = System.nanoTime(); - messages.wait(remainingTimeout); - long sleepEnd = System.nanoTime(); - long sleepDuration = sleepEnd - sleepStart; - long sleepMillis = TimeUnit.NANOSECONDS.toMillis(sleepDuration); - if (sleepMillis < 1L) { - remainingTimeout--; - } else { - remainingTimeout -= sleepMillis; - } - } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException( - "Interrupted init() while " + remainingTimeout + " msec. remaining").initCause(e); - } - } - if (isClosing() || (!isOpen())) { - throw new EOFException("Closing while await init message"); - } - - if (messages.isEmpty()) { - throw new SocketTimeoutException( - "No incoming initialization response received within " + initializationTimeout + " msec."); - } + boolean traceEnabled = log.isTraceEnabled(); + IoOutputStream asyncIn = channel.getAsyncIn(); + ClientChannel clientChannel = getClientChannel(); + if (traceEnabled) { + log.trace("init({}) send SSH_FXP_INIT", clientChannel); + } + IoWriteFuture writeFuture = asyncIn.writePacket(buf); + writeFuture.verify(); - Collection<Integer> ids = messages.keySet(); - Iterator<Integer> iter = ids.iterator(); - reqId = iter.next(); - buffer = messages.remove(reqId); + if (traceEnabled) { + log.trace("init({}) wait for SSH_FXP_INIT respose (timeout={})", clientChannel, initializationTimeout); } + Buffer buffer = waitForInitResponse(initializationTimeout); + handleInitResponse(buffer); + } + protected void handleInitResponse(Buffer buffer) throws IOException { + boolean traceEnabled = log.isTraceEnabled(); + ClientChannel clientChannel = getClientChannel(); int length = buffer.getInt(); int type = buffer.getUByte(); int id = buffer.getInt(); - boolean traceEnabled = log.isTraceEnabled(); if (traceEnabled) { - log.trace("init({}) id={} type={} len={}", - getClientChannel(), id, SftpConstants.getCommandMessageName(type), length); + log.trace("handleInitResponse({}) id={} type={} len={}", + clientChannel, id, SftpConstants.getCommandMessageName(type), length); } if (type == SftpConstants.SSH_FXP_VERSION) { @@ -395,14 +372,14 @@ public class DefaultSftpClient extends AbstractSftpClient { versionHolder.set(id); if (traceEnabled) { - log.trace("init({}) version={}", getClientChannel(), versionHolder); + log.trace("handleInitResponse({}) version={}", clientChannel, versionHolder); } while (buffer.available() > 0) { String name = buffer.getString(); byte[] data = buffer.getBytes(); if (traceEnabled) { - log.trace("init({}) added extension={}", getClientChannel(), name); + log.trace("handleInitResponse({}) added extension={}", clientChannel, name); } extensions.put(name, data); } @@ -411,8 +388,8 @@ public class DefaultSftpClient extends AbstractSftpClient { String msg = buffer.getString(); String lang = buffer.getString(); if (traceEnabled) { - log.trace("init({})[id={}] - status: {} [{}] {}", - getClientChannel(), id, SftpConstants.getStatusName(substatus), lang, msg); + log.trace("handleInitResponse({})[id={}] - status: {} [{}] {}", + clientChannel, id, SftpConstants.getStatusName(substatus), lang, msg); } throwStatusException(SftpConstants.SSH_FXP_INIT, id, substatus, msg, lang); @@ -426,6 +403,51 @@ public class DefaultSftpClient extends AbstractSftpClient { } } + protected Buffer waitForInitResponse(long initializationTimeout) throws IOException { + ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout); + + synchronized (messages) { + /* + * We need to use a timeout since if the remote server does not support SFTP, we will not know it + * immediately. This is due to the fact that the request for the subsystem does not contain a reply as to + * its success or failure. Thus, the SFTP channel is created by the client, but there is no one on the other + * side to reply - thus the need for the timeout + */ + for (long remainingTimeout = initializationTimeout; + (remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) { + try { + long sleepStart = System.nanoTime(); + messages.wait(remainingTimeout); + long sleepEnd = System.nanoTime(); + long sleepDuration = sleepEnd - sleepStart; + long sleepMillis = TimeUnit.NANOSECONDS.toMillis(sleepDuration); + if (sleepMillis < 1L) { + remainingTimeout--; + } else { + remainingTimeout -= sleepMillis; + } + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException( + "Interrupted init() while " + remainingTimeout + " msec. remaining").initCause(e); + } + } + + if (isClosing() || (!isOpen())) { + throw new EOFException("Closing while await init message"); + } + + if (messages.isEmpty()) { + throw new SocketTimeoutException( + "No incoming initialization response received within " + initializationTimeout + " msec."); + } + + Collection<Integer> ids = messages.keySet(); + Iterator<Integer> iter = ids.iterator(); + Integer reqId = iter.next(); + return messages.remove(reqId); + } + } + /** * @param selector The {@link SftpVersionSelector} to use - ignored if {@code null} * @return The selected version (may be same as current) @@ -485,9 +507,12 @@ public class DefaultSftpClient extends AbstractSftpClient { return selected; } - private class SftpChannelSubsystem extends ChannelSubsystem { + protected ChannelSubsystem createSftpChannelSubsystem(ClientSession clientSession) { + return new SftpChannelSubsystem(); + } - SftpChannelSubsystem() { + protected class SftpChannelSubsystem extends ChannelSubsystem { + protected SftpChannelSubsystem() { super(SftpConstants.SFTP_SUBSYSTEM_NAME); } @@ -506,14 +531,19 @@ public class DefaultSftpClient extends AbstractSftpClient { addPendingRequest(Channel.CHANNEL_SUBSYSTEM, wantReply); writePacket(buffer); - asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) { + asyncIn = createAsyncInput(session); + setOut(createStdOutputStream(session)); + setErr(createErrOutputStream(session)); + } + + protected ChannelAsyncOutputStream createAsyncInput(Session session) { + return new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) { @SuppressWarnings("synthetic-access") @Override protected CloseFuture doCloseGracefully() { try { sendEof(); } catch (IOException e) { - Session session = getSession(); session.exceptionCaught(e); } return super.doCloseGracefully(); @@ -521,7 +551,7 @@ public class DefaultSftpClient extends AbstractSftpClient { @Override protected Buffer createSendBuffer(Buffer buffer, Channel channel, long length) { - if (buffer.rpos() >= 9 && length == buffer.available()) { + if ((buffer.rpos() >= 9) && (length == buffer.available())) { int rpos = buffer.rpos(); int wpos = buffer.wpos(); buffer.rpos(rpos - 9); @@ -537,7 +567,10 @@ public class DefaultSftpClient extends AbstractSftpClient { } } }; - out = new OutputStream() { + } + + protected OutputStream createStdOutputStream(Session session) { + return new OutputStream() { private final byte[] singleByte = new byte[1]; @Override @@ -553,7 +586,11 @@ public class DefaultSftpClient extends AbstractSftpClient { data(b, off, len); } }; - err = new ByteArrayOutputStream(); + } + + protected OutputStream createErrOutputStream(Session session) { + // TODO use some limit in case some data is constantly written to this stream + return new ByteArrayOutputStream(); } } } diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java new file mode 100644 index 0000000..e21d48e --- /dev/null +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.client.subsystem.sftp.impl; + +/** + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +@SuppressWarnings("checkstyle:VisibilityModifier") +public class SftpAckData { + public final int id; + public final long offset; + public final int length; + + public SftpAckData(int id, long offset, int length) { + this.id = id; + this.offset = offset; + this.length = length; + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "[id=" + id + + ", offset=" + offset + + ", length=" + length + + "]"; + } +} diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java index ec3e593..c1a73d2 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java @@ -32,6 +32,9 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient; import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle; import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode; import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.channel.Window; +import org.apache.sshd.common.session.Session; import org.apache.sshd.common.subsystem.sftp.SftpConstants; import org.apache.sshd.common.subsystem.sftp.SftpHelper; import org.apache.sshd.common.util.buffer.Buffer; @@ -39,30 +42,18 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.io.InputStreamWithChannel; public class SftpInputStreamAsync extends InputStreamWithChannel { - - static class Ack { - int id; - long offset; - int length; - - Ack(int id, long offset, int length) { - this.id = id; - this.offset = offset; - this.length = length; - } - } + protected final byte[] bb = new byte[1]; + protected final int bufferSize; + protected final long fileSize; + protected Buffer buffer; + protected CloseableHandle handle; + protected long requestOffset; + protected long clientOffset; + protected final Deque<SftpAckData> pendingReads = new LinkedList<>(); + protected boolean eofIndicator; private final AbstractSftpClient client; private final String path; - private final byte[] bb = new byte[1]; - private final int bufferSize; - private final long fileSize; - private Buffer buffer; - private CloseableHandle handle; - private long requestOffset; - private long clientOffset; - private final Deque<Ack> pendingReads = new LinkedList<>(); - private boolean eofIndicator; public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, String path, Collection<OpenMode> mode) throws IOException { @@ -156,8 +147,9 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { if (!isOpen()) { throw new IOException("transferTo(" + getPath() + ") stream closed"); } + long orgOffset = clientOffset; - while (!eofIndicator && max > 0) { + while ((!eofIndicator) && (max > 0L)) { if (hasNoData()) { fillData(); if (eofIndicator && hasNoData()) { @@ -179,11 +171,11 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { return clientOffset - orgOffset; } - @SuppressWarnings("PMD.MissingOverride") public long transferTo(OutputStream out) throws IOException { if (!isOpen()) { throw new IOException("transferTo(" + getPath() + ") stream closed"); } + long orgOffset = clientOffset; while (!eofIndicator) { if (hasNoData()) { @@ -207,41 +199,46 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { if (!isOpen()) { throw new IOException("skip(" + getPath() + ") stream closed"); } - if (clientOffset == 0 && pendingReads.isEmpty()) { + if ((clientOffset == 0L) && pendingReads.isEmpty()) { clientOffset = n; return n; } return super.skip(n); } - boolean hasNoData() { - return buffer == null || buffer.available() == 0; + protected boolean hasNoData() { + return (buffer == null) || (buffer.available() == 0); } - void sendRequests() throws IOException { + protected void sendRequests() throws IOException { if (!eofIndicator) { - long windowSize = client.getChannel().getLocalWindow().getMaxSize(); - while (pendingReads.size() < (int) (windowSize / bufferSize) && requestOffset < fileSize + bufferSize + Channel channel = client.getChannel(); + Window localWindow = channel.getLocalWindow(); + long windowSize = localWindow.getMaxSize(); + Session session = client.getSession(); + byte[] id = handle.getIdentifier(); + + while ((pendingReads.size() < (int) (windowSize / bufferSize)) && (requestOffset < (fileSize + bufferSize)) || pendingReads.isEmpty()) { - Buffer buf = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, - 23 /* sftp packet */ + 16 + handle.getIdentifier().length); + Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, + 23 /* sftp packet */ + 16 + id.length); buf.rpos(23); buf.wpos(23); - buf.putBytes(handle.getIdentifier()); + buf.putBytes(id); buf.putLong(requestOffset); buf.putInt(bufferSize); int reqId = client.send(SftpConstants.SSH_FXP_READ, buf); - pendingReads.add(new Ack(reqId, requestOffset, bufferSize)); + pendingReads.add(new SftpAckData(reqId, requestOffset, bufferSize)); requestOffset += bufferSize; } } } - void fillData() throws IOException { - Ack ack = pendingReads.pollFirst(); + protected void fillData() throws IOException { + SftpAckData ack = pendingReads.pollFirst(); if (ack != null) { pollBuffer(ack); - if (!eofIndicator && clientOffset < ack.offset) { + if ((!eofIndicator) && (clientOffset < ack.offset)) { // we are actually missing some data // so request is synchronously byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())]; @@ -250,7 +247,8 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { AtomicReference<Boolean> eof = new AtomicReference<>(); while (cur < nb) { int dlen = client.read(handle, clientOffset, data, cur, nb - cur, eof); - eofIndicator = dlen < 0 || eof.get() != null && eof.get(); + Boolean eofSignal = eof.getAndSet(null); + eofIndicator = (dlen < 0) || ((eofSignal != null) && eofSignal.booleanValue()); cur += dlen; } buffer.getRawBytes(data, nb, buffer.available()); @@ -259,7 +257,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { } } - void pollBuffer(Ack ack) throws IOException { + protected void pollBuffer(SftpAckData ack) throws IOException { Buffer buf = client.receive(ack.id); int length = buf.getInt(); int type = buf.getUByte(); @@ -270,7 +268,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { int rpos = buf.rpos(); buf.rpos(rpos + dlen); Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion()); - eofIndicator = b != null && b; + eofIndicator = (b != null) && b.booleanValue(); buf.rpos(rpos); buf.wpos(rpos + dlen); this.buffer = buf; @@ -298,7 +296,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel { try { try { while (!pendingReads.isEmpty()) { - Ack ack = pendingReads.removeFirst(); + SftpAckData ack = pendingReads.removeFirst(); pollBuffer(ack); } } finally { diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java index d8f1974..b5b809c 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java @@ -28,6 +28,7 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient; import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle; import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode; import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.session.Session; import org.apache.sshd.common.subsystem.sftp.SftpConstants; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; @@ -39,27 +40,15 @@ import org.apache.sshd.common.util.io.OutputStreamWithChannel; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public class SftpOutputStreamAsync extends OutputStreamWithChannel { - - static class Ack { - int id; - long offset; - int length; - - Ack(int id, long offset, int length) { - this.id = id; - this.offset = offset; - this.length = length; - } - } + protected final byte[] bb = new byte[1]; + protected final int bufferSize; + protected Buffer buffer; + protected CloseableHandle handle; + protected long offset; + protected final Deque<SftpAckData> pendingWrites = new LinkedList<>(); private final AbstractSftpClient client; private final String path; - private final byte[] bb = new byte[1]; - private final int bufferSize; - private Buffer buffer; - private CloseableHandle handle; - private long offset; - private final Deque<Ack> pendingWrites = new LinkedList<>(); public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize, String path, Collection<OpenMode> mode) throws IOException { @@ -112,14 +101,17 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { @Override public void write(byte[] b, int off, int len) throws IOException { + byte[] id = handle.getIdentifier(); + Session session = client.getSession(); + do { if (buffer == null) { - buffer = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize); - int hdr = (9 + 16 + 8 + handle.getIdentifier().length) + buffer.wpos(); + buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize); + int hdr = 9 + 16 + 8 + id.length + buffer.wpos(); buffer.rpos(hdr); buffer.wpos(hdr); } - int max = bufferSize - (9 + 16 + handle.getIdentifier().length + 72); + int max = bufferSize - (9 + 16 + id.length + 72); int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos())); buffer.putRawBytes(b, off, nb); if (buffer.available() == max) { @@ -137,9 +129,9 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { } for (;;) { - Ack ack = pendingWrites.peek(); + SftpAckData ack = pendingWrites.peek(); if (ack != null) { - Buffer response = client.receive(ack.id, 0); + Buffer response = client.receive(ack.id, 0L); if (response != null) { pendingWrites.removeFirst(); client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response); @@ -154,7 +146,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { byte[] id = handle.getIdentifier(); int avail = buffer.available(); Buffer buf; - if (buffer.rpos() >= 16 + id.length) { + if (buffer.rpos() >= (16 + id.length)) { int wpos = buffer.wpos(); buffer.rpos(buffer.rpos() - 16 - id.length); buffer.wpos(buffer.rpos()); @@ -171,7 +163,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { } int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf); - pendingWrites.add(new Ack(reqId, offset, avail)); + pendingWrites.add(new SftpAckData(reqId, offset, avail)); offset += avail; buffer = null; @@ -182,11 +174,11 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel { if (isOpen()) { try { try { - if (buffer != null && buffer.available() > 0) { + if ((buffer != null) && (buffer.available() > 0)) { flush(); } while (!pendingWrites.isEmpty()) { - Ack ack = pendingWrites.removeFirst(); + SftpAckData ack = pendingWrites.removeFirst(); Buffer response = client.receive(ack.id); client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response); } diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java index b6275d1..b698c9f 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java @@ -227,18 +227,6 @@ public class SftpRemotePathChannel extends FileChannel { return doWrite(buffers, -1L); } - static class Ack { - int id; - long offset; - int length; - - Ack(int id, long offset, int length) { - this.id = id; - this.offset = offset; - this.length = length; - } - } - protected long doWrite(Collection<? extends ByteBuffer> buffers, long position) throws IOException { ensureOpen(WRITE_MODES); @@ -362,11 +350,12 @@ public class SftpRemotePathChannel extends FileChannel { try { beginBlocking("transferTo"); + // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE + @SuppressWarnings("resource") SftpInputStreamAsync input = new SftpInputStreamAsync( (AbstractSftpClient) sftp, copySize, position, count, getRemotePath(), handle); totalRead = input.transferTo(count, target); - // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE eof = input.isEof(); completed = true; } finally { @@ -379,7 +368,7 @@ public class SftpRemotePathChannel extends FileChannel { this, position, count, copySize, totalRead, eof, target); } - return totalRead > 0L ? totalRead : eof ? -1L : 0L; + return (totalRead > 0L) ? totalRead : eof ? -1L : 0L; } @Override @@ -400,7 +389,6 @@ public class SftpRemotePathChannel extends FileChannel { } boolean completed = false; - long curPos = (position >= 0L) ? position : posTracker.get(); long totalRead = 0L; byte[] buffer = new byte[(int) Math.min(copySize, count)]; @@ -408,6 +396,8 @@ public class SftpRemotePathChannel extends FileChannel { try { beginBlocking("transferFrom"); + // DO NOT CLOSE THE OUTPUT STREAM AS IT WOULD CLOSE THE HANDLE + @SuppressWarnings("resource") SftpOutputStreamAsync output = new SftpOutputStreamAsync( (AbstractSftpClient) sftp, copySize, getRemotePath(), handle); @@ -417,7 +407,6 @@ public class SftpRemotePathChannel extends FileChannel { int read = src.read(wrap); if (read > 0) { output.write(buffer, 0, read); - curPos += read; totalRead += read; } else { break; @@ -441,8 +430,7 @@ public class SftpRemotePathChannel extends FileChannel { @Override public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { throw new UnsupportedOperationException( - "map(" + getRemotePath() + ")" - + "[" + mode + "," + position + "," + size + "] N/A"); + "map(" + getRemotePath() + ")[" + mode + "," + position + "," + size + "] N/A"); } @Override diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java index 6676eb4..9e4237d 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java @@ -110,6 +110,7 @@ public class FileHandle extends Handle { return read(data, 0, data.length, offset); } + @SuppressWarnings("resource") public int read(byte[] data, int doff, int length, long offset) throws IOException { SeekableByteChannel channel = getFileChannel(); channel = channel.position(offset);