This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 5fa3cb02989acb0dc87fd8f44a26e15968432b6c Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Mon Nov 4 19:43:35 2019 +0200 [SSHD-931] Using an executor supplier instead of a specific instance in 'ScpCommandFactory' --- CHANGES.md | 8 ++-- docs/scp.md | 18 +++++++- .../server/command/AbstractFileSystemCommand.java | 3 +- .../java/org/apache/sshd/common/scp/ScpHelper.java | 51 +++++++++++++++------- .../apache/sshd/server/scp/ScpCommandFactory.java | 41 +++++++++-------- .../java/org/apache/sshd/client/scp/ScpTest.java | 5 ++- .../sshd/server/scp/ScpCommandFactoryTest.java | 9 ++-- 7 files changed, 91 insertions(+), 44 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 761c61c..d9ba927 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,8 +6,8 @@ ## Major code re-factoring -* `SftpSubSystemFactory` and its `Builder` use a `Supplier<CloseableExecutorService>` instead of -an executor instance in order to allow users to provide a "fresh" instance every time an SFTP +* `SftpSubSystemFactory,ScpCommandFactory` and their respective `Builder`(s) use a `Supplier<CloseableExecutorService>` +instead of an executor instance in order to allow users to provide a "fresh" instance every time an SFTP session is initiated and protect their instance from shutdown when session is destroyed: ```java @@ -73,7 +73,8 @@ the message type=30 (old request). * [SSHD-930](https://issues.apache.org/jira/browse/SSHD-930) - Added configuration allowing the user to specify whether client should wait for the server's identification before sending its own. -* [SSHD-931](https://issues.apache.org/jira/browse/SSHD-931) - Using an executor supplier instead of a specific instance in `SftpSubsystemFactory`. +* [SSHD-931](https://issues.apache.org/jira/browse/SSHD-931) - Using an executor supplier instead of a specific instance in `SftpSubsystemFactory` +and `ScpCommandFactory`. * [SSHD-934](https://issues.apache.org/jira/browse/SSHD-934) - Fixed ECDSA public key encoding into OpenSSH format. @@ -96,3 +97,4 @@ for the server's identification before sending KEX-INIT message. * [SSHD-949](https://issues.apache.org/jira/browse/SSHD-949) - Session should use cipher block size and not IV size to calculate padding. * [SSHD-953](https://issues.apache.org/jira/browse/SSHD-953) - Parse and strip quoted command arguments when executing a server-side command via local shell. + diff --git a/docs/scp.md b/docs/scp.md index 840cadf..54007b4 100644 --- a/docs/scp.md +++ b/docs/scp.md @@ -128,5 +128,21 @@ different sensitivity via `DirectoryScanner#setCaseSensitive` call (or executes ### Server-side SCP +Setting up SCP support on the server side is straightforward - simply initialize a `ScpCommandFactory` and +set it as the **primary** command factory. If support for commands other than SCP is also required then provide +the extra commands factory as a **delegate** of the `ScpCommandFactory`. The SCP factory will intercept the SCP +command and execute it, while propagating all other commands to the delegate. If no delegate configured then the +non-SCP command is deemed as having failed (same as if it were rejected by the delegate). + +```java +ScpCommandFactory factory = new ScpCommandFactory.Builder() + .withwithDelegate(new MyCommandDelegate()) + .build(); + +SshServer sshd = ...create an instance... +sshd.setCommandFactory(factory); +``` + The `ScpCommandFactory` allows users to attach an `ScpFileOpener` and/or `ScpTransferEventListener` having the same behavior as the client - i.e., -monitoring and intervention on the accessed local files. +monitoring and intervention on the accessed local files. Furthermore, the factory can also be configured with a custom executor service for +executing the requested copy commands as well as controlling the internal buffer sizes used to copy files. diff --git a/sshd-core/src/main/java/org/apache/sshd/server/command/AbstractFileSystemCommand.java b/sshd-core/src/main/java/org/apache/sshd/server/command/AbstractFileSystemCommand.java index ff330ed..7bffb2d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/command/AbstractFileSystemCommand.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/command/AbstractFileSystemCommand.java @@ -35,7 +35,8 @@ public abstract class AbstractFileSystemCommand extends AbstractCommandSupport i protected FileSystem fileSystem; - public AbstractFileSystemCommand(String command, CloseableExecutorService executorService) { + public AbstractFileSystemCommand( + String command, CloseableExecutorService executorService) { super(command, executorService); } diff --git a/sshd-scp/src/main/java/org/apache/sshd/common/scp/ScpHelper.java b/sshd-scp/src/main/java/org/apache/sshd/common/scp/ScpHelper.java index c9e1696..91e7118 100644 --- a/sshd-scp/src/main/java/org/apache/sshd/common/scp/ScpHelper.java +++ b/sshd-scp/src/main/java/org/apache/sshd/common/scp/ScpHelper.java @@ -159,9 +159,14 @@ public class ScpHelper extends AbstractLoggingBean implements SessionHolder<Sess }); } - public void receive(Path local, boolean recursive, boolean shouldBeDir, boolean preserve, int bufferSize) throws IOException { - Path localPath = Objects.requireNonNull(local, "No local path").normalize().toAbsolutePath(); - Path path = opener.resolveIncomingReceiveLocation(getSession(), localPath, recursive, shouldBeDir, preserve); + public void receive( + Path local, boolean recursive, boolean shouldBeDir, boolean preserve, int bufferSize) + throws IOException { + Path localPath = Objects.requireNonNull(local, "No local path") + .normalize() + .toAbsolutePath(); + Path path = opener.resolveIncomingReceiveLocation( + getSession(), localPath, recursive, shouldBeDir, preserve); receive((session, line, isDir, time) -> { if (recursive && isDir) { receiveDir(line, path, time, preserve, bufferSize); @@ -226,9 +231,14 @@ public class ScpHelper extends AbstractLoggingBean implements SessionHolder<Sess } } - public void receiveDir(String header, Path local, ScpTimestamp time, boolean preserve, int bufferSize) throws IOException { - Path path = Objects.requireNonNull(local, "No local path").normalize().toAbsolutePath(); - if (log.isDebugEnabled()) { + public void receiveDir( + String header, Path local, ScpTimestamp time, boolean preserve, int bufferSize) + throws IOException { + Path path = Objects.requireNonNull(local, "No local path") + .normalize() + .toAbsolutePath(); + boolean debugEnabled = log.isDebugEnabled(); + if (debugEnabled) { log.debug("receiveDir({})[{}] Receiving directory {} - preserve={}, time={}, buffer-size={}", this, header, path, preserve, time, bufferSize); } @@ -244,7 +254,8 @@ public class ScpHelper extends AbstractLoggingBean implements SessionHolder<Sess } Session session = getSession(); - Path file = opener.resolveIncomingFilePath(session, path, name, preserve, perms, time); + Path file = opener.resolveIncomingFilePath( + session, path, name, preserve, perms, time); ack(); @@ -254,9 +265,10 @@ public class ScpHelper extends AbstractLoggingBean implements SessionHolder<Sess try { for (;;) { header = readLine(); - if (log.isDebugEnabled()) { + if (debugEnabled) { log.debug("receiveDir({})[{}] Received header: {}", this, file, header); } + if (header.startsWith("C")) { receiveFile(header, file, time, preserve, bufferSize); time = null; @@ -280,18 +292,25 @@ public class ScpHelper extends AbstractLoggingBean implements SessionHolder<Sess listener.endFolderEvent(session, FileOperation.RECEIVE, path, perms, null); } - public void receiveFile(String header, Path local, ScpTimestamp time, boolean preserve, int bufferSize) throws IOException { - Path path = Objects.requireNonNull(local, "No local path").normalize().toAbsolutePath(); + public void receiveFile( + String header, Path local, ScpTimestamp time, boolean preserve, int bufferSize) + throws IOException { + Path path = Objects.requireNonNull(local, "No local path") + .normalize() + .toAbsolutePath(); if (log.isDebugEnabled()) { log.debug("receiveFile({})[{}] Receiving file {} - preserve={}, time={}, buffer-size={}", - this, header, path, preserve, time, bufferSize); + this, header, path, preserve, time, bufferSize); } - ScpTargetStreamResolver targetStreamResolver = opener.createScpTargetStreamResolver(getSession(), path); + ScpTargetStreamResolver targetStreamResolver = + opener.createScpTargetStreamResolver(getSession(), path); receiveStream(header, targetStreamResolver, time, preserve, bufferSize); } - public void receiveStream(String header, ScpTargetStreamResolver resolver, ScpTimestamp time, boolean preserve, int bufferSize) throws IOException { + public void receiveStream( + String header, ScpTargetStreamResolver resolver, ScpTimestamp time, boolean preserve, int bufferSize) + throws IOException { if (!header.startsWith("C")) { throw new IOException("receiveStream(" + resolver + ") Expected a C message but got '" + header + "'"); } @@ -313,7 +332,7 @@ public class ScpHelper extends AbstractLoggingBean implements SessionHolder<Sess if (length == 0L) { if (debugEnabled) { log.debug("receiveStream({})[{}] zero file size (perhaps special file) using copy buffer size={}", - this, resolver, MIN_RECEIVE_BUFFER_SIZE); + this, resolver, MIN_RECEIVE_BUFFER_SIZE); } bufSize = MIN_RECEIVE_BUFFER_SIZE; } else { @@ -377,7 +396,9 @@ public class ScpHelper extends AbstractLoggingBean implements SessionHolder<Sess } } - public void send(Collection<String> paths, boolean recursive, boolean preserve, int bufferSize) throws IOException { + public void send( + Collection<String> paths, boolean recursive, boolean preserve, int bufferSize) + throws IOException { int readyCode = readAck(false); boolean debugEnabled = log.isDebugEnabled(); if (debugEnabled) { diff --git a/sshd-scp/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java b/sshd-scp/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java index 6f7503b..0e051ad 100644 --- a/sshd-scp/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java +++ b/sshd-scp/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java @@ -20,6 +20,7 @@ package org.apache.sshd.server.scp; import java.util.Collection; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Supplier; import org.apache.sshd.common.scp.ScpFileOpener; import org.apache.sshd.common.scp.ScpFileOpenerHolder; @@ -29,7 +30,6 @@ import org.apache.sshd.common.util.EventListenerUtils; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ObjectBuilder; import org.apache.sshd.common.util.threads.CloseableExecutorService; -import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; import org.apache.sshd.server.command.AbstractDelegatingCommandFactory; import org.apache.sshd.server.command.Command; import org.apache.sshd.server.command.CommandFactory; @@ -44,9 +44,7 @@ import org.apache.sshd.server.command.CommandFactory; */ public class ScpCommandFactory extends AbstractDelegatingCommandFactory - implements ScpFileOpenerHolder, - Cloneable, - ExecutorServiceCarrier { + implements ScpFileOpenerHolder, Cloneable { public static final String SCP_FACTORY_NAME = "scp"; @@ -70,8 +68,9 @@ public class ScpCommandFactory return this; } - public Builder withExecutorService(CloseableExecutorService service) { - factory.setExecutorService(service); + public Builder withExecutorServiceProvider( + Supplier<? extends CloseableExecutorService> provider) { + factory.setExecutorServiceProvider(provider); return this; } @@ -101,7 +100,7 @@ public class ScpCommandFactory } } - private CloseableExecutorService executors; + private Supplier<? extends CloseableExecutorService> executorsProvider; private ScpFileOpener fileOpener; private int sendBufferSize = ScpHelper.MIN_SEND_BUFFER_SIZE; private int receiveBufferSize = ScpHelper.MIN_RECEIVE_BUFFER_SIZE; @@ -110,7 +109,8 @@ public class ScpCommandFactory public ScpCommandFactory() { super(SCP_FACTORY_NAME); - listenerProxy = EventListenerUtils.proxyWrapper(ScpTransferEventListener.class, getClass().getClassLoader(), listeners); + listenerProxy = EventListenerUtils.proxyWrapper( + ScpTransferEventListener.class, getClass().getClassLoader(), listeners); } @Override @@ -123,20 +123,17 @@ public class ScpCommandFactory this.fileOpener = fileOpener; } - @Override - public CloseableExecutorService getExecutorService() { - return executors; + public Supplier<? extends CloseableExecutorService> getExecutorServiceProvider() { + return executorsProvider; } /** - * @param service An {@link CloseableExecutorService} to be used when + * @param provider A {@link Supplier} of {@link CloseableExecutorService} to be used when * starting {@link ScpCommand} execution. If {@code null} then a single-threaded - * ad-hoc service is used. <B>Note:</B> the service will <U>not</U> be shutdown - * when the command is terminated - unless it is the ad-hoc service, which will be - * shutdown regardless + * ad-hoc service is used. */ - public void setExecutorService(CloseableExecutorService service) { - executors = service; + public void setExecutorServiceProvider(Supplier<? extends CloseableExecutorService> provider) { + executorsProvider = provider; } public int getSendBufferSize() { @@ -211,18 +208,24 @@ public class ScpCommandFactory @Override protected Command executeSupportedCommand(String command) { return new ScpCommand(command, - getExecutorService(), + resolveExecutorService(command), getSendBufferSize(), getReceiveBufferSize(), getScpFileOpener(), listenerProxy); } + protected CloseableExecutorService resolveExecutorService(String command) { + Supplier<? extends CloseableExecutorService> provider = getExecutorServiceProvider(); + return (provider == null) ? null : provider.get(); + } + @Override public ScpCommandFactory clone() { try { ScpCommandFactory other = getClass().cast(super.clone()); // clone the listeners set as well other.listeners = new CopyOnWriteArraySet<>(this.listeners); - other.listenerProxy = EventListenerUtils.proxyWrapper(ScpTransferEventListener.class, getClass().getClassLoader(), other.listeners); + other.listenerProxy = EventListenerUtils.proxyWrapper( + ScpTransferEventListener.class, getClass().getClassLoader(), other.listeners); return other; } catch (CloneNotSupportedException e) { throw new RuntimeException(e); // un-expected... diff --git a/sshd-scp/src/test/java/org/apache/sshd/client/scp/ScpTest.java b/sshd-scp/src/test/java/org/apache/sshd/client/scp/ScpTest.java index b622f53..b245e8a 100644 --- a/sshd-scp/src/test/java/org/apache/sshd/client/scp/ScpTest.java +++ b/sshd-scp/src/test/java/org/apache/sshd/client/scp/ScpTest.java @@ -904,9 +904,10 @@ public class ScpTest extends BaseTestSupport { sshd.setCommandFactory(new ScpCommandFactory() { @Override public Command createCommand(ChannelSession channel, String command) { - ValidateUtils.checkTrue(command.startsWith(ScpHelper.SCP_COMMAND_PREFIX), "Bad SCP command: %s", command); + ValidateUtils.checkTrue( + command.startsWith(ScpHelper.SCP_COMMAND_PREFIX), "Bad SCP command: %s", command); return new InternalScpCommand(command, - getExecutorService(), + resolveExecutorService(command), getSendBufferSize(), getReceiveBufferSize(), DefaultScpFileOpener.INSTANCE, ScpTransferEventListener.EMPTY); diff --git a/sshd-scp/src/test/java/org/apache/sshd/server/scp/ScpCommandFactoryTest.java b/sshd-scp/src/test/java/org/apache/sshd/server/scp/ScpCommandFactoryTest.java index ff2c130..ff1f1c5 100644 --- a/sshd-scp/src/test/java/org/apache/sshd/server/scp/ScpCommandFactoryTest.java +++ b/sshd-scp/src/test/java/org/apache/sshd/server/scp/ScpCommandFactoryTest.java @@ -19,6 +19,8 @@ package org.apache.sshd.server.scp; +import java.util.function.Supplier; + import org.apache.sshd.common.scp.ScpHelper; import org.apache.sshd.common.util.threads.CloseableExecutorService; import org.apache.sshd.server.command.CommandFactory; @@ -48,7 +50,7 @@ public class ScpCommandFactoryTest extends BaseTestSupport { public void testBuilderDefaultFactoryValues() { ScpCommandFactory factory = new ScpCommandFactory.Builder().build(); assertNull("Mismatched delegate", factory.getDelegateCommandFactory()); - assertNull("Mismatched executor", factory.getExecutorService()); + assertNull("Mismatched executor", factory.getExecutorServiceProvider()); assertEquals("Mismatched send size", ScpHelper.MIN_SEND_BUFFER_SIZE, factory.getSendBufferSize()); assertEquals("Mismatched receive size", ScpHelper.MIN_RECEIVE_BUFFER_SIZE, factory.getReceiveBufferSize()); } @@ -60,16 +62,17 @@ public class ScpCommandFactoryTest extends BaseTestSupport { public void testBuilderCorrectlyInitializesFactory() { CommandFactory delegate = dummyFactory(); CloseableExecutorService service = dummyExecutor(); + Supplier<CloseableExecutorService> provider = () -> service; int receiveSize = Short.MAX_VALUE; int sendSize = receiveSize + Long.SIZE; ScpCommandFactory factory = new ScpCommandFactory.Builder() .withDelegate(delegate) - .withExecutorService(service) + .withExecutorServiceProvider(provider) .withSendBufferSize(sendSize) .withReceiveBufferSize(receiveSize) .build(); assertSame("Mismatched delegate", delegate, factory.getDelegateCommandFactory()); - assertSame("Mismatched executor", service, factory.getExecutorService()); + assertSame("Mismatched executor", provider, factory.getExecutorServiceProvider()); assertEquals("Mismatched send size", sendSize, factory.getSendBufferSize()); assertEquals("Mismatched receive size", receiveSize, factory.getReceiveBufferSize()); }