This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 5fa3cb02989acb0dc87fd8f44a26e15968432b6c
Author: Lyor Goldstein <lgoldst...@apache.org>
AuthorDate: Mon Nov 4 19:43:35 2019 +0200

    [SSHD-931] Using an executor supplier instead of a specific instance in 
'ScpCommandFactory'
---
 CHANGES.md                                         |  8 ++--
 docs/scp.md                                        | 18 +++++++-
 .../server/command/AbstractFileSystemCommand.java  |  3 +-
 .../java/org/apache/sshd/common/scp/ScpHelper.java | 51 +++++++++++++++-------
 .../apache/sshd/server/scp/ScpCommandFactory.java  | 41 +++++++++--------
 .../java/org/apache/sshd/client/scp/ScpTest.java   |  5 ++-
 .../sshd/server/scp/ScpCommandFactoryTest.java     |  9 ++--
 7 files changed, 91 insertions(+), 44 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 761c61c..d9ba927 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,8 +6,8 @@
 
 ## Major code re-factoring
 
-* `SftpSubSystemFactory` and its `Builder` use a 
`Supplier<CloseableExecutorService>` instead of
-an executor instance in order to allow users to provide a "fresh" instance 
every time an SFTP
+* `SftpSubSystemFactory,ScpCommandFactory` and their respective `Builder`(s) 
use a `Supplier<CloseableExecutorService>`
+instead of an executor instance in order to allow users to provide a "fresh" 
instance every time an SFTP
 session is initiated and protect their instance from shutdown when session is 
destroyed:
 
 ```java
@@ -73,7 +73,8 @@ the message type=30 (old request).
 * [SSHD-930](https://issues.apache.org/jira/browse/SSHD-930) - Added 
configuration allowing the user to specify whether client should wait
 for the server's identification before sending its own.
 
-* [SSHD-931](https://issues.apache.org/jira/browse/SSHD-931) - Using an 
executor supplier instead of a specific instance in `SftpSubsystemFactory`.
+* [SSHD-931](https://issues.apache.org/jira/browse/SSHD-931) - Using an 
executor supplier instead of a specific instance in `SftpSubsystemFactory`
+and `ScpCommandFactory`.
 
 * [SSHD-934](https://issues.apache.org/jira/browse/SSHD-934) - Fixed ECDSA 
public key encoding into OpenSSH format.
 
@@ -96,3 +97,4 @@ for the server's identification before sending KEX-INIT 
message.
 * [SSHD-949](https://issues.apache.org/jira/browse/SSHD-949) - Session should 
use cipher block size and not IV size to calculate padding.
 
 * [SSHD-953](https://issues.apache.org/jira/browse/SSHD-953) - Parse and strip 
quoted command arguments when executing a server-side command via local shell.
+
diff --git a/docs/scp.md b/docs/scp.md
index 840cadf..54007b4 100644
--- a/docs/scp.md
+++ b/docs/scp.md
@@ -128,5 +128,21 @@ different sensitivity via 
`DirectoryScanner#setCaseSensitive` call (or executes
 
 ### Server-side SCP
 
+Setting up SCP support on the server side is straightforward - simply 
initialize a `ScpCommandFactory` and
+set it as the **primary** command factory. If support for commands other than 
SCP is also required then provide
+the extra commands factory as a **delegate** of the `ScpCommandFactory`. The 
SCP factory will intercept the SCP
+command and execute it, while propagating all other commands to the delegate. 
If no delegate configured then the
+non-SCP command is deemed as having failed (same as if it were rejected by the 
delegate).
+
+```java
+ScpCommandFactory factory = new ScpCommandFactory.Builder()
+    .withwithDelegate(new MyCommandDelegate())
+    .build();
+
+SshServer sshd = ...create an instance...
+sshd.setCommandFactory(factory);
+```
+
 The `ScpCommandFactory` allows users to attach an `ScpFileOpener` and/or 
`ScpTransferEventListener` having the same behavior as the client - i.e.,
-monitoring and intervention on the accessed local files.
+monitoring and intervention on the accessed local files. Furthermore, the 
factory can also be configured with a custom executor service for
+executing the requested copy commands as well as controlling the internal 
buffer sizes used to copy files.
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/command/AbstractFileSystemCommand.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/command/AbstractFileSystemCommand.java
index ff330ed..7bffb2d 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/command/AbstractFileSystemCommand.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/command/AbstractFileSystemCommand.java
@@ -35,7 +35,8 @@ public abstract class AbstractFileSystemCommand extends 
AbstractCommandSupport i
 
     protected FileSystem fileSystem;
 
-    public AbstractFileSystemCommand(String command, CloseableExecutorService 
executorService) {
+    public AbstractFileSystemCommand(
+            String command, CloseableExecutorService executorService) {
         super(command, executorService);
     }
 
diff --git a/sshd-scp/src/main/java/org/apache/sshd/common/scp/ScpHelper.java 
b/sshd-scp/src/main/java/org/apache/sshd/common/scp/ScpHelper.java
index c9e1696..91e7118 100644
--- a/sshd-scp/src/main/java/org/apache/sshd/common/scp/ScpHelper.java
+++ b/sshd-scp/src/main/java/org/apache/sshd/common/scp/ScpHelper.java
@@ -159,9 +159,14 @@ public class ScpHelper extends AbstractLoggingBean 
implements SessionHolder<Sess
         });
     }
 
-    public void receive(Path local, boolean recursive, boolean shouldBeDir, 
boolean preserve, int bufferSize) throws IOException {
-        Path localPath = Objects.requireNonNull(local, "No local 
path").normalize().toAbsolutePath();
-        Path path = opener.resolveIncomingReceiveLocation(getSession(), 
localPath, recursive, shouldBeDir, preserve);
+    public void receive(
+            Path local, boolean recursive, boolean shouldBeDir, boolean 
preserve, int bufferSize)
+                throws IOException {
+        Path localPath = Objects.requireNonNull(local, "No local path")
+            .normalize()
+            .toAbsolutePath();
+        Path path = opener.resolveIncomingReceiveLocation(
+            getSession(), localPath, recursive, shouldBeDir, preserve);
         receive((session, line, isDir, time) -> {
             if (recursive && isDir) {
                 receiveDir(line, path, time, preserve, bufferSize);
@@ -226,9 +231,14 @@ public class ScpHelper extends AbstractLoggingBean 
implements SessionHolder<Sess
         }
     }
 
-    public void receiveDir(String header, Path local, ScpTimestamp time, 
boolean preserve, int bufferSize) throws IOException {
-        Path path = Objects.requireNonNull(local, "No local 
path").normalize().toAbsolutePath();
-        if (log.isDebugEnabled()) {
+    public void receiveDir(
+            String header, Path local, ScpTimestamp time, boolean preserve, 
int bufferSize)
+                throws IOException {
+        Path path = Objects.requireNonNull(local, "No local path")
+            .normalize()
+            .toAbsolutePath();
+        boolean debugEnabled = log.isDebugEnabled();
+        if (debugEnabled) {
             log.debug("receiveDir({})[{}] Receiving directory {} - 
preserve={}, time={}, buffer-size={}",
                       this, header, path, preserve, time, bufferSize);
         }
@@ -244,7 +254,8 @@ public class ScpHelper extends AbstractLoggingBean 
implements SessionHolder<Sess
         }
 
         Session session = getSession();
-        Path file = opener.resolveIncomingFilePath(session, path, name, 
preserve, perms, time);
+        Path file = opener.resolveIncomingFilePath(
+            session, path, name, preserve, perms, time);
 
         ack();
 
@@ -254,9 +265,10 @@ public class ScpHelper extends AbstractLoggingBean 
implements SessionHolder<Sess
         try {
             for (;;) {
                 header = readLine();
-                if (log.isDebugEnabled()) {
+                if (debugEnabled) {
                     log.debug("receiveDir({})[{}] Received header: {}", this, 
file, header);
                 }
+
                 if (header.startsWith("C")) {
                     receiveFile(header, file, time, preserve, bufferSize);
                     time = null;
@@ -280,18 +292,25 @@ public class ScpHelper extends AbstractLoggingBean 
implements SessionHolder<Sess
         listener.endFolderEvent(session, FileOperation.RECEIVE, path, perms, 
null);
     }
 
-    public void receiveFile(String header, Path local, ScpTimestamp time, 
boolean preserve, int bufferSize) throws IOException {
-        Path path = Objects.requireNonNull(local, "No local 
path").normalize().toAbsolutePath();
+    public void receiveFile(
+            String header, Path local, ScpTimestamp time, boolean preserve, 
int bufferSize)
+                throws IOException {
+        Path path = Objects.requireNonNull(local, "No local path")
+            .normalize()
+            .toAbsolutePath();
         if (log.isDebugEnabled()) {
             log.debug("receiveFile({})[{}] Receiving file {} - preserve={}, 
time={}, buffer-size={}",
-                      this, header, path, preserve, time, bufferSize);
+                  this, header, path, preserve, time, bufferSize);
         }
 
-        ScpTargetStreamResolver targetStreamResolver = 
opener.createScpTargetStreamResolver(getSession(), path);
+        ScpTargetStreamResolver targetStreamResolver =
+            opener.createScpTargetStreamResolver(getSession(), path);
         receiveStream(header, targetStreamResolver, time, preserve, 
bufferSize);
     }
 
-    public void receiveStream(String header, ScpTargetStreamResolver resolver, 
ScpTimestamp time, boolean preserve, int bufferSize) throws IOException {
+    public void receiveStream(
+            String header, ScpTargetStreamResolver resolver, ScpTimestamp 
time, boolean preserve, int bufferSize)
+                throws IOException {
         if (!header.startsWith("C")) {
             throw new IOException("receiveStream(" + resolver + ") Expected a 
C message but got '" + header + "'");
         }
@@ -313,7 +332,7 @@ public class ScpHelper extends AbstractLoggingBean 
implements SessionHolder<Sess
         if (length == 0L) {
             if (debugEnabled) {
                 log.debug("receiveStream({})[{}] zero file size (perhaps 
special file) using copy buffer size={}",
-                          this, resolver, MIN_RECEIVE_BUFFER_SIZE);
+                      this, resolver, MIN_RECEIVE_BUFFER_SIZE);
             }
             bufSize = MIN_RECEIVE_BUFFER_SIZE;
         } else {
@@ -377,7 +396,9 @@ public class ScpHelper extends AbstractLoggingBean 
implements SessionHolder<Sess
         }
     }
 
-    public void send(Collection<String> paths, boolean recursive, boolean 
preserve, int bufferSize) throws IOException {
+    public void send(
+            Collection<String> paths, boolean recursive, boolean preserve, int 
bufferSize)
+                throws IOException {
         int readyCode = readAck(false);
         boolean debugEnabled = log.isDebugEnabled();
         if (debugEnabled) {
diff --git 
a/sshd-scp/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java 
b/sshd-scp/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
index 6f7503b..0e051ad 100644
--- a/sshd-scp/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
+++ b/sshd-scp/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
@@ -20,6 +20,7 @@ package org.apache.sshd.server.scp;
 
 import java.util.Collection;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.Supplier;
 
 import org.apache.sshd.common.scp.ScpFileOpener;
 import org.apache.sshd.common.scp.ScpFileOpenerHolder;
@@ -29,7 +30,6 @@ import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ObjectBuilder;
 import org.apache.sshd.common.util.threads.CloseableExecutorService;
-import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.server.command.AbstractDelegatingCommandFactory;
 import org.apache.sshd.server.command.Command;
 import org.apache.sshd.server.command.CommandFactory;
@@ -44,9 +44,7 @@ import org.apache.sshd.server.command.CommandFactory;
  */
 public class ScpCommandFactory
         extends AbstractDelegatingCommandFactory
-        implements ScpFileOpenerHolder,
-        Cloneable,
-        ExecutorServiceCarrier {
+        implements ScpFileOpenerHolder, Cloneable {
 
     public static final String SCP_FACTORY_NAME = "scp";
 
@@ -70,8 +68,9 @@ public class ScpCommandFactory
             return this;
         }
 
-        public Builder withExecutorService(CloseableExecutorService service) {
-            factory.setExecutorService(service);
+        public Builder withExecutorServiceProvider(
+                Supplier<? extends CloseableExecutorService> provider) {
+            factory.setExecutorServiceProvider(provider);
             return this;
         }
 
@@ -101,7 +100,7 @@ public class ScpCommandFactory
         }
     }
 
-    private CloseableExecutorService executors;
+    private Supplier<? extends CloseableExecutorService> executorsProvider;
     private ScpFileOpener fileOpener;
     private int sendBufferSize = ScpHelper.MIN_SEND_BUFFER_SIZE;
     private int receiveBufferSize = ScpHelper.MIN_RECEIVE_BUFFER_SIZE;
@@ -110,7 +109,8 @@ public class ScpCommandFactory
 
     public ScpCommandFactory() {
         super(SCP_FACTORY_NAME);
-        listenerProxy = 
EventListenerUtils.proxyWrapper(ScpTransferEventListener.class, 
getClass().getClassLoader(), listeners);
+        listenerProxy = EventListenerUtils.proxyWrapper(
+            ScpTransferEventListener.class, getClass().getClassLoader(), 
listeners);
     }
 
     @Override
@@ -123,20 +123,17 @@ public class ScpCommandFactory
         this.fileOpener = fileOpener;
     }
 
-    @Override
-    public CloseableExecutorService getExecutorService() {
-        return executors;
+    public Supplier<? extends CloseableExecutorService> 
getExecutorServiceProvider() {
+        return executorsProvider;
     }
 
     /**
-     * @param service An {@link CloseableExecutorService} to be used when
+     * @param provider A {@link Supplier} of {@link CloseableExecutorService} 
to be used when
      * starting {@link ScpCommand} execution. If {@code null} then a 
single-threaded
-     * ad-hoc service is used. <B>Note:</B> the service will <U>not</U> be 
shutdown
-     * when the command is terminated - unless it is the ad-hoc service, which 
will be
-     * shutdown regardless
+     * ad-hoc service is used.
      */
-    public void setExecutorService(CloseableExecutorService service) {
-        executors = service;
+    public void setExecutorServiceProvider(Supplier<? extends 
CloseableExecutorService> provider) {
+        executorsProvider = provider;
     }
 
     public int getSendBufferSize() {
@@ -211,18 +208,24 @@ public class ScpCommandFactory
     @Override
     protected Command executeSupportedCommand(String command) {
         return new ScpCommand(command,
-                getExecutorService(),
+                resolveExecutorService(command),
                 getSendBufferSize(), getReceiveBufferSize(),
                 getScpFileOpener(), listenerProxy);
     }
 
+    protected CloseableExecutorService resolveExecutorService(String command) {
+        Supplier<? extends CloseableExecutorService> provider = 
getExecutorServiceProvider();
+        return (provider == null) ? null : provider.get();
+    }
+
     @Override
     public ScpCommandFactory clone() {
         try {
             ScpCommandFactory other = getClass().cast(super.clone());
             // clone the listeners set as well
             other.listeners = new CopyOnWriteArraySet<>(this.listeners);
-            other.listenerProxy = 
EventListenerUtils.proxyWrapper(ScpTransferEventListener.class, 
getClass().getClassLoader(), other.listeners);
+            other.listenerProxy = EventListenerUtils.proxyWrapper(
+                ScpTransferEventListener.class, getClass().getClassLoader(), 
other.listeners);
             return other;
         } catch (CloneNotSupportedException e) {
             throw new RuntimeException(e);    // un-expected...
diff --git a/sshd-scp/src/test/java/org/apache/sshd/client/scp/ScpTest.java 
b/sshd-scp/src/test/java/org/apache/sshd/client/scp/ScpTest.java
index b622f53..b245e8a 100644
--- a/sshd-scp/src/test/java/org/apache/sshd/client/scp/ScpTest.java
+++ b/sshd-scp/src/test/java/org/apache/sshd/client/scp/ScpTest.java
@@ -904,9 +904,10 @@ public class ScpTest extends BaseTestSupport {
         sshd.setCommandFactory(new ScpCommandFactory() {
             @Override
             public Command createCommand(ChannelSession channel, String 
command) {
-                
ValidateUtils.checkTrue(command.startsWith(ScpHelper.SCP_COMMAND_PREFIX), "Bad 
SCP command: %s", command);
+                ValidateUtils.checkTrue(
+                    command.startsWith(ScpHelper.SCP_COMMAND_PREFIX), "Bad SCP 
command: %s", command);
                 return new InternalScpCommand(command,
-                        getExecutorService(),
+                        resolveExecutorService(command),
                         getSendBufferSize(), getReceiveBufferSize(),
                         DefaultScpFileOpener.INSTANCE,
                         ScpTransferEventListener.EMPTY);
diff --git 
a/sshd-scp/src/test/java/org/apache/sshd/server/scp/ScpCommandFactoryTest.java 
b/sshd-scp/src/test/java/org/apache/sshd/server/scp/ScpCommandFactoryTest.java
index ff2c130..ff1f1c5 100644
--- 
a/sshd-scp/src/test/java/org/apache/sshd/server/scp/ScpCommandFactoryTest.java
+++ 
b/sshd-scp/src/test/java/org/apache/sshd/server/scp/ScpCommandFactoryTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.sshd.server.scp;
 
+import java.util.function.Supplier;
+
 import org.apache.sshd.common.scp.ScpHelper;
 import org.apache.sshd.common.util.threads.CloseableExecutorService;
 import org.apache.sshd.server.command.CommandFactory;
@@ -48,7 +50,7 @@ public class ScpCommandFactoryTest extends BaseTestSupport {
     public void testBuilderDefaultFactoryValues() {
         ScpCommandFactory factory = new ScpCommandFactory.Builder().build();
         assertNull("Mismatched delegate", factory.getDelegateCommandFactory());
-        assertNull("Mismatched executor", factory.getExecutorService());
+        assertNull("Mismatched executor", 
factory.getExecutorServiceProvider());
         assertEquals("Mismatched send size", ScpHelper.MIN_SEND_BUFFER_SIZE, 
factory.getSendBufferSize());
         assertEquals("Mismatched receive size", 
ScpHelper.MIN_RECEIVE_BUFFER_SIZE, factory.getReceiveBufferSize());
     }
@@ -60,16 +62,17 @@ public class ScpCommandFactoryTest extends BaseTestSupport {
     public void testBuilderCorrectlyInitializesFactory() {
         CommandFactory delegate = dummyFactory();
         CloseableExecutorService service = dummyExecutor();
+        Supplier<CloseableExecutorService> provider = () -> service;
         int receiveSize = Short.MAX_VALUE;
         int sendSize = receiveSize + Long.SIZE;
         ScpCommandFactory factory = new ScpCommandFactory.Builder()
                 .withDelegate(delegate)
-                .withExecutorService(service)
+                .withExecutorServiceProvider(provider)
                 .withSendBufferSize(sendSize)
                 .withReceiveBufferSize(receiveSize)
                 .build();
         assertSame("Mismatched delegate", delegate, 
factory.getDelegateCommandFactory());
-        assertSame("Mismatched executor", service, 
factory.getExecutorService());
+        assertSame("Mismatched executor", provider, 
factory.getExecutorServiceProvider());
         assertEquals("Mismatched send size", sendSize, 
factory.getSendBufferSize());
         assertEquals("Mismatched receive size", receiveSize, 
factory.getReceiveBufferSize());
     }

Reply via email to