Repository: mina-sshd
Updated Branches:
  refs/heads/master f8d842b1e -> 330d17c81


[SSHD-480] Replace all "new Thread(...)" calls with "ExecutorService#submit"


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/009d832d
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/009d832d
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/009d832d

Branch: refs/heads/master
Commit: 009d832d4a1316bcfc1e2921c386d078dcb3a3ce
Parents: f8d842b
Author: Lyor Goldstein <lgoldst...@vmware.com>
Authored: Mon Jun 1 16:01:21 2015 +0300
Committer: Lyor Goldstein <lgoldst...@vmware.com>
Committed: Mon Jun 1 16:01:21 2015 +0300

----------------------------------------------------------------------
 src/tomcat-apr-5.5.23-sources.jar               | Bin 0 -> 25227 bytes
 .../main/java/org/apache/sshd/SshClient.java    |   9 +-
 .../main/java/org/apache/sshd/SshServer.java    |   9 +-
 .../java/org/apache/sshd/agent/SshAgent.java    |   3 +-
 .../sshd/agent/common/AbstractAgentClient.java  |  28 ++---
 .../sshd/agent/common/AbstractAgentProxy.java   |  96 +++++++++++---
 .../apache/sshd/agent/common/AgentDelegate.java |   5 +
 .../sshd/agent/local/AgentForwardedChannel.java |  15 ++-
 .../org/apache/sshd/agent/local/AgentImpl.java  |  32 +++--
 .../org/apache/sshd/agent/unix/AgentClient.java |  45 +++++--
 .../org/apache/sshd/agent/unix/AgentServer.java |  90 ++++++++++---
 .../sshd/agent/unix/AgentServerProxy.java       | 125 +++++++++++++------
 .../sshd/agent/unix/ChannelAgentForwarding.java |  90 +++++++++----
 .../sshd/agent/unix/UnixAgentFactory.java       |  54 +++++++-
 .../client/channel/AbstractClientChannel.java   |   3 +-
 .../sshd/client/channel/ChannelSession.java     |  56 ++++++---
 .../sshd/client/channel/ChannelSubsystem.java   |   7 +-
 .../sshd/common/channel/AbstractChannel.java    |  41 +++++-
 .../sshd/common/forward/TcpipServerChannel.java |  58 +++++++--
 .../sshd/common/util/threads/ThreadUtils.java   |  83 ++++++------
 .../apache/sshd/server/command/ScpCommand.java  |   2 +-
 .../apache/sshd/server/sftp/SftpSubsystem.java  |   2 +-
 .../test/java/org/apache/sshd/AgentTest.java    |  35 +++---
 .../apache/sshd/git/pack/GitPackCommand.java    |  11 +-
 .../org/apache/sshd/git/pgm/GitPgmCommand.java  |  11 +-
 25 files changed, 665 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/src/tomcat-apr-5.5.23-sources.jar
----------------------------------------------------------------------
diff --git a/src/tomcat-apr-5.5.23-sources.jar 
b/src/tomcat-apr-5.5.23-sources.jar
new file mode 100644
index 0000000..5f6fd04
Binary files /dev/null and b/src/tomcat-apr-5.5.23-sources.jar differ

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshClient.java 
b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
index 4f74d61..7e13195 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
@@ -240,9 +240,12 @@ public class SshClient extends AbstractFactoryManager 
implements ClientFactoryMa
                     public void run() {
                         connector = null;
                         ioServiceFactory = null;
-                        if (shutdownExecutor && executor != null) {
-                            executor.shutdownNow();
-                            executor = null;
+                        if (shutdownExecutor && (executor != null) && 
(!executor.isShutdown())) {
+                            try {
+                                executor.shutdownNow();
+                            } finally {
+                                executor = null;
+                            }
                         }
                     }
                 })

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/SshServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshServer.java 
b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
index 87ce889..024fc76 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
@@ -323,9 +323,12 @@ public class SshServer extends AbstractFactoryManager 
implements ServerFactoryMa
                     public void run() {
                         acceptor = null;
                         ioServiceFactory = null;
-                        if (shutdownExecutor && executor != null) {
-                            executor.shutdownNow();
-                            executor = null;
+                        if (shutdownExecutor && (executor != null) && 
(!executor.isShutdown())) {
+                            try {
+                                executor.shutdownNow();
+                            } finally {
+                                executor = null;
+                            }
                         }
                     }
                 })

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java
index b3b2ff5..4303721 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sshd.agent;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.security.KeyPair;
 import java.security.PublicKey;
@@ -27,7 +26,7 @@ import java.util.List;
 /**
  * SSH key agent server
  */
-public interface SshAgent extends Closeable {
+public interface SshAgent extends java.nio.channels.Channel {
 
     public static final String SSH_AUTHSOCKET_ENV_NAME = "SSH_AUTH_SOCK";
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java
index 44d6c23..9e48c7d 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java
@@ -36,6 +36,7 @@ import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.common.util.AbstractLoggingBean;
 import org.apache.sshd.common.util.KeyUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 
 public abstract class AbstractAgentClient extends AbstractLoggingBean {
@@ -43,7 +44,7 @@ public abstract class AbstractAgentClient extends 
AbstractLoggingBean {
     private final Buffer buffer = new ByteArrayBuffer();
     private final SshAgent agent;
 
-    public AbstractAgentClient(SshAgent agent) {
+    protected AbstractAgentClient(SshAgent agent) {
         this.agent = agent;
     }
 
@@ -76,8 +77,7 @@ public abstract class AbstractAgentClient extends 
AbstractLoggingBean {
     protected void process(Buffer req, Buffer rep) throws Exception {
         int cmd = req.getByte();
         switch (cmd) {
-            case SSH2_AGENTC_REQUEST_IDENTITIES:
-            {
+            case SSH2_AGENTC_REQUEST_IDENTITIES: {
                 List<SshAgent.Pair<PublicKey,String>> keys = 
agent.getIdentities();
                 rep.putByte(SSH2_AGENT_IDENTITIES_ANSWER);
                 rep.putInt(keys.size());
@@ -87,11 +87,14 @@ public abstract class AbstractAgentClient extends 
AbstractLoggingBean {
                 }
                 break;
             }
-            case SSH2_AGENTC_SIGN_REQUEST:
-            {
+            case SSH2_AGENTC_SIGN_REQUEST: {
                 PublicKey key = req.getPublicKey();
                 byte[] data = req.getBytes();
                 int flags = req.getInt();
+                if (log.isDebugEnabled()) {
+                    log.debug("SSH2_AGENTC_SIGN_REQUEST key={}, flags=0x{}, 
data={}",
+                              key.getAlgorithm(), Integer.toHexString(flags), 
BufferUtils.printHex(':', data));
+                }
                 Buffer sig = new ByteArrayBuffer();
                 sig.putString(KeyUtils.getKeyType(key));
                 sig.putBytes(agent.sign(key, data));
@@ -99,30 +102,27 @@ public abstract class AbstractAgentClient extends 
AbstractLoggingBean {
                 rep.putBytes(sig.array(), sig.rpos(), sig.available());
                 break;
             }
-            case SSH2_AGENTC_ADD_IDENTITY:
-            {
+            case SSH2_AGENTC_ADD_IDENTITY: {
                 agent.addIdentity(req.getKeyPair(), req.getString());
                 rep.putByte(SSH_AGENT_SUCCESS);
                 break;
             }
-            case SSH2_AGENTC_REMOVE_IDENTITY:
-            {
+            case SSH2_AGENTC_REMOVE_IDENTITY: {
                 PublicKey key = req.getPublicKey();
                 agent.removeIdentity(key);
                 rep.putByte(SSH_AGENT_SUCCESS);
                 break;
             }
             case SSH2_AGENTC_REMOVE_ALL_IDENTITIES:
-            {
                 agent.removeAllIdentities();
                 rep.putByte(SSH_AGENT_SUCCESS);
                 break;
-            }
             default:
-            {
+                if (log.isDebugEnabled()) {
+                    log.debug("Unknown command: {}", Integer.valueOf(cmd));
+                }
+
                 rep.putByte(SSH2_AGENT_FAILURE);
-                break;
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
index 7af4ee2..61df6ae 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
@@ -31,14 +31,46 @@ import java.io.IOException;
 import java.security.KeyPair;
 import java.security.PublicKey;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.util.AbstractLoggingBean;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
 
-public abstract class AbstractAgentProxy implements SshAgent {
+public abstract class AbstractAgentProxy extends AbstractLoggingBean 
implements SshAgent, ExecutorServiceConfigurer {
+    private ExecutorService executor;
+    private boolean shutdownExecutor;
+
+    protected AbstractAgentProxy() {
+        super();
+    }
+
+    @Override
+    public ExecutorService getExecutorService() {
+        return executor;
+    }
+
+    @Override
+    public void setExecutorService(ExecutorService service) {
+        executor = service;
+    }
+
+    @Override
+    public boolean isShutdownOnExit() {
+        return shutdownExecutor;
+    }
+
+    @Override
+    public void setShutdownOnExit(boolean shutdown) {
+        shutdownExecutor = shutdown;
+    }
 
     @Override
     public List<Pair<PublicKey, String>> getIdentities() throws IOException {
@@ -46,13 +78,15 @@ public abstract class AbstractAgentProxy implements 
SshAgent {
         buffer = request(prepare(buffer));
         int type = buffer.getByte();
         if (type != SSH2_AGENT_IDENTITIES_ANSWER) {
-            throw new SshException("SSH agent failure");
+            throw new SshException("Bad agent identities answer: " + type);
         }
+
         int nbIdentities = buffer.getInt();
         if (nbIdentities > 1024) {
-            throw new SshException("SSH agent failure");
+            throw new SshException("Bad identities count: " + nbIdentities);
         }
-        List<Pair<PublicKey, String>> keys = new ArrayList<Pair<PublicKey, 
String>>();
+
+        List<Pair<PublicKey, String>> keys = new ArrayList<Pair<PublicKey, 
String>>(nbIdentities);
         for (int i = 0; i < nbIdentities; i++) {
             PublicKey key = buffer.getPublicKey();
             keys.add(new Pair<PublicKey, String>(key, buffer.getString()));
@@ -67,12 +101,19 @@ public abstract class AbstractAgentProxy implements 
SshAgent {
         buffer.putBytes(data);
         buffer.putInt(0);
         buffer = request(prepare(buffer));
-        if (buffer.getByte() != SSH2_AGENT_SIGN_RESPONSE) {
-            throw new SshException("SSH agent failure");
+        
+        byte responseType = buffer.getByte(); 
+        if (responseType != SSH2_AGENT_SIGN_RESPONSE) {
+            throw new SshException("Bad signing response type: " + 
(responseType & 0xFF));
         }
         Buffer buf = new ByteArrayBuffer(buffer.getBytes());
-        buf.getString(); // algo
-        return buf.getBytes();
+        String algorithm = buf.getString();
+        byte[] signature = buf.getBytes(); 
+        if (log.isDebugEnabled()) {
+            log.debug("sign(" + algorithm + "): " + BufferUtils.printHex(':', 
signature));
+        }
+
+        return signature;
     }
 
     @Override
@@ -80,9 +121,15 @@ public abstract class AbstractAgentProxy implements 
SshAgent {
         Buffer buffer = createBuffer(SSH2_AGENTC_ADD_IDENTITY);
         buffer.putKeyPair(key);
         buffer.putString(comment);
+        if (log.isDebugEnabled()) {
+            log.debug("addIdentity(" + comment + "): " + 
key.getPublic().getAlgorithm());
+        }
         buffer = request(prepare(buffer));
-        if (buffer.available() != 1 || buffer.getByte() != SSH_AGENT_SUCCESS) {
-            throw new SshException("SSH agent failure");
+        
+        int available = buffer.available();
+        byte response = (available >= 1) ? buffer.getByte() : -1;
+        if ((available != 1) || (response != SSH_AGENT_SUCCESS)) {
+            throw new SshException("Bad addIdentity response (" + (response & 
0xFF) + ") - available=" + available);
         }
     }
 
@@ -90,24 +137,43 @@ public abstract class AbstractAgentProxy implements 
SshAgent {
     public void removeIdentity(PublicKey key) throws IOException {
         Buffer buffer = createBuffer(SSH2_AGENTC_REMOVE_IDENTITY);
         buffer.putPublicKey(key);
+        if (log.isDebugEnabled()) {
+            log.debug("removeIdentity: " + key.getAlgorithm());
+        }
+
         buffer = request(prepare(buffer));
-        if (buffer.available() != 1 || buffer.getByte() != SSH_AGENT_SUCCESS) {
-            throw new SshException("SSH agent failure");
+
+        int available = buffer.available();
+        byte response = (available >= 1) ? buffer.getByte() : -1;
+        if ((available != 1) || (response != SSH_AGENT_SUCCESS)) {
+            throw new SshException("Bad removeIdentity response (" + (response 
& 0xFF) + ") - available=" + available);
         }
     }
 
     @Override
     public void removeAllIdentities() throws IOException {
         Buffer buffer = createBuffer(SSH2_AGENTC_REMOVE_ALL_IDENTITIES);
+        if (log.isDebugEnabled()) {
+            log.debug("removeAllIdentities");
+        }
         buffer = request(prepare(buffer));
-        if (buffer.available() != 1 || buffer.getByte() != SSH_AGENT_SUCCESS) {
-            throw new SshException("SSH agent failure");
+
+        int available = buffer.available();
+        byte response = (available >= 1) ? buffer.getByte() : -1;
+        if ((available != 1) || (response != SSH_AGENT_SUCCESS)) {
+            throw new SshException("Bad removeAllIdentities response (" + 
(response & 0xFF) + ") - available=" + available);
         }
     }
 
     @Override
     public void close() throws IOException {
-        // nothing
+        ExecutorService service = getExecutorService();
+        if ((service != null) && isShutdownOnExit() && 
(!service.isShutdown())) {
+            Collection<?> runners = service.shutdownNow();
+            if (log.isDebugEnabled()) {
+                log.debug("close() - shutdown runners count=" + 
GenericUtils.size(runners));
+            }
+        }
     }
 
     protected Buffer createBuffer(byte cmd) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java
index 26df439..ed74fb6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java
@@ -34,6 +34,11 @@ public class AgentDelegate implements SshAgent {
     }
 
     @Override
+    public boolean isOpen() {
+        return agent.isOpen();
+    }
+
+    @Override
     public void close() throws IOException {
         // ignored
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index ea0779a..0318ac1 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.agent.common.AbstractAgentProxy;
@@ -42,15 +43,25 @@ public class AgentForwardedChannel extends 
AbstractClientChannel {
 
     public SshAgent getAgent() {
         return new AbstractAgentProxy() {
+            private final AtomicBoolean open = new AtomicBoolean(true);
+
+            @Override
+            public boolean isOpen() {
+                return open.get();
+            }
+
             @Override
             protected Buffer request(Buffer buffer) throws IOException {
                 return AgentForwardedChannel.this.request(buffer);
             }
+
             @Override
             public void close() throws IOException {
-                AgentForwardedChannel.this.close(false);
+                if (open.getAndSet(false)) {
+                    AgentForwardedChannel.this.close(false);
+                    super.close();
+                }
             }
-
         };
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java
index d4c6850..5ccea64 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java
@@ -28,6 +28,7 @@ import java.security.interfaces.RSAPublicKey;
 import java.security.spec.ECParameterSpec;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.common.Signature;
@@ -40,13 +41,23 @@ import org.apache.sshd.common.signature.BuiltinSignatures;
 public class AgentImpl implements SshAgent {
 
     private final List<Pair<KeyPair, String>> keys = new 
ArrayList<Pair<KeyPair, String>>();
-    private boolean closed;
+    private final AtomicBoolean open = new AtomicBoolean(true);
+
+    public AgentImpl() {
+        super();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open.get();
+    }
 
     @Override
     public List<Pair<PublicKey, String>> getIdentities() throws IOException {
-        if (closed) {
+        if (!isOpen()) {
             throw new SshException("Agent closed");
         }
+
         List<Pair<PublicKey, String>> pks = new ArrayList<Pair<PublicKey, 
String>>();
         for (Pair<KeyPair, String> kp : keys) {
             pks.add(new Pair<PublicKey, String>(kp.getFirst().getPublic(), 
kp.getSecond()));
@@ -56,15 +67,17 @@ public class AgentImpl implements SshAgent {
 
     @Override
     public byte[] sign(PublicKey key, byte[] data) throws IOException {
-        if (closed) {
+        if (!isOpen()) {
             throw new SshException("Agent closed");
         }
+
         Pair<KeyPair, String> kp = getKeyPair(keys, key);
         if (kp == null) {
             throw new SshException("Key not found");
         }
+
         try {
-            Signature verif;
+            final Signature verif;
             if (kp.getFirst().getPublic() instanceof DSAPublicKey) {
                 verif = BuiltinSignatures.dsa.create();
             } else if (kp.getFirst().getPublic() instanceof ECPublicKey) {
@@ -87,7 +100,7 @@ public class AgentImpl implements SshAgent {
 
     @Override
     public void addIdentity(KeyPair key, String comment) throws IOException {
-        if (closed) {
+        if (!isOpen()) {
             throw new SshException("Agent closed");
         }
         keys.add(new Pair<KeyPair, String>(key, comment));
@@ -95,7 +108,7 @@ public class AgentImpl implements SshAgent {
 
     @Override
     public void removeIdentity(PublicKey key) throws IOException {
-        if (closed) {
+        if (!isOpen()) {
             throw new SshException("Agent closed");
         }
         Pair<KeyPair, String> kp = getKeyPair(keys, key);
@@ -107,7 +120,7 @@ public class AgentImpl implements SshAgent {
 
     @Override
     public void removeAllIdentities() throws IOException {
-        if (closed) {
+        if (!isOpen()) {
             throw new SshException("Agent closed");
         }
         keys.clear();
@@ -115,8 +128,9 @@ public class AgentImpl implements SshAgent {
 
     @Override
     public void close() throws IOException {
-        closed = true;
-        keys.clear();
+        if (open.getAndSet(false)) {
+            keys.clear();
+        }
     }
 
     protected static SshAgent.Pair<KeyPair, String> 
getKeyPair(List<SshAgent.Pair<KeyPair, String>> keys, PublicKey key) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
index a8a5271..cd192be 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
@@ -22,11 +22,15 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.agent.common.AbstractAgentProxy;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.tomcat.jni.Local;
 import org.apache.tomcat.jni.Pool;
 import org.apache.tomcat.jni.Socket;
@@ -42,11 +46,20 @@ public class AgentClient extends AbstractAgentProxy 
implements Runnable {
     private final long handle;
     private final Buffer receiveBuffer;
     private final Queue<Buffer> messages;
-    private boolean closed;
+    private Future<?> pumper;
+    private final AtomicBoolean open = new AtomicBoolean(true);
 
     public AgentClient(String authSocket) throws IOException {
+        this(authSocket, null, false);
+    }
+
+    public AgentClient(String authSocket, ExecutorService executor, boolean 
shutdownOnExit) throws IOException {
+        this.authSocket = authSocket;
+        
+        setExecutorService((executor == null) ? 
ThreadUtils.newSingleThreadExecutor("AgentClient[" + authSocket + "]") : 
executor);
+        setShutdownOnExit((executor == null) ? true : shutdownOnExit);
+
         try {
-            this.authSocket = authSocket;
             pool = Pool.create(AprLibrary.getInstance().getRootPool());
             handle = Local.create(authSocket, pool);
             int result = Local.connect(handle, 0);
@@ -55,7 +68,9 @@ public class AgentClient extends AbstractAgentProxy 
implements Runnable {
             }
             receiveBuffer = new ByteArrayBuffer();
             messages = new ArrayBlockingQueue<Buffer>(10);
-            new Thread(this).start();
+            
+            ExecutorService service = getExecutorService();
+            pumper = service.submit(this);
         } catch (IOException e) {
             throw e;
         } catch (Exception e) {
@@ -64,10 +79,15 @@ public class AgentClient extends AbstractAgentProxy 
implements Runnable {
     }
 
     @Override
+    public boolean isOpen() {
+        return open.get();
+    }
+
+    @Override
     public void run() {
         try {
             byte[] buf = new byte[1024];
-            while (!closed) {
+            while (isOpen()) {
                 int result = Socket.recv(handle, buf, 0, buf.length);
                 if (result < Status.APR_SUCCESS) {
                     throwException(result);
@@ -75,14 +95,16 @@ public class AgentClient extends AbstractAgentProxy 
implements Runnable {
                 messageReceived(new ByteArrayBuffer(buf, 0, result));
             }
         } catch (Exception e) {
-            if (!closed) {
-                e.printStackTrace();
+            if (isOpen()) {
+                log.warn(e.getClass().getSimpleName() + " while still open: " 
+ e.getMessage());
             }
         } finally {
             try {
                 close();
             } catch(IOException e) {
-                e.printStackTrace();
+                if (log.isDebugEnabled()) {
+                    log.debug(e.getClass().getSimpleName() + " while closing: 
" + e.getMessage());
+                }
             }
         }
     }
@@ -111,10 +133,15 @@ public class AgentClient extends AbstractAgentProxy 
implements Runnable {
 
     @Override
     public void close() throws IOException {
-        if (!closed) {
-            closed = true;
+        if (open.getAndSet(false)) {
             Socket.close(handle);
         }
+        
+        if ((pumper != null) && isShutdownOnExit() && (!pumper.isDone())) {
+            pumper.cancel(true);
+        }
+
+        super.close();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
index 52146da..f176790 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
@@ -20,13 +20,19 @@ package org.apache.sshd.agent.unix;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.agent.common.AbstractAgentClient;
 import org.apache.sshd.agent.local.AgentImpl;
 import org.apache.sshd.common.util.AbstractLoggingBean;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.tomcat.jni.Local;
 import org.apache.tomcat.jni.Pool;
 import org.apache.tomcat.jni.Socket;
@@ -36,26 +42,44 @@ import org.apache.tomcat.jni.Status;
 /**
  * A server for an SSH Agent
  */
-public class AgentServer extends AbstractLoggingBean implements Closeable {
+public class AgentServer extends AbstractLoggingBean implements Closeable, 
ExecutorServiceCarrier {
 
     private final SshAgent agent;
+    private final ExecutorService service;
+    private final boolean shutdownExecutor;
+    private Future<?> agentThread;
     private String authSocket;
     private long pool;
     private long handle;
-    private Thread thread;
 
     public AgentServer() {
-        this(new AgentImpl());
+        this(null, false);
     }
 
-    public AgentServer(SshAgent agent) {
+    public AgentServer(ExecutorService executor, boolean shutdownOnExit) {
+        this(new AgentImpl(), executor, shutdownOnExit);
+    }
+
+    public AgentServer(SshAgent agent, ExecutorService executor, boolean 
shutdownOnExit) {
         this.agent = agent;
+        this.service = (executor == null) ? 
ThreadUtils.newSingleThreadExecutor("AgentServer[" + agent + "]") : executor;
+        this.shutdownExecutor = (service == executor) ?  shutdownOnExit : true;
     }
 
     public SshAgent getAgent() {
         return agent;
     }
 
+    @Override
+    public ExecutorService getExecutorService() {
+        return service;
+    }
+
+    @Override
+    public boolean isShutdownOnExit() {
+        return shutdownExecutor;
+    }
+
     public String start() throws Exception {
         authSocket = AprLibrary.createLocalSocketAddress();
         pool = Pool.create(AprLibrary.getInstance().getRootPool());
@@ -69,29 +93,56 @@ public class AgentServer extends AbstractLoggingBean 
implements Closeable {
         if (result != Status.APR_SUCCESS) {
             throwException(result);
         }
-        thread = new Thread() {
-            @SuppressWarnings("synthetic-access")
-            @Override
-            public void run() {
-                try {
-                    while (true) {
-                        long clientSock = Local.accept(handle);
-                        Socket.timeoutSet(clientSock, 10000000);
-                        new SshAgentSession(clientSock, agent);
+        
+        ExecutorService executor = getExecutorService();
+        agentThread = executor.submit(new Runnable() {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                public void run() {
+                    try {
+                        while (true) {
+                            long clientSock = Local.accept(handle);
+                            Socket.timeoutSet(clientSock, 10000000);    // 
TODO make this configurable
+                            new SshAgentSession(clientSock, agent).run();
+                        }
+                    } catch (Exception e) {
+                        log.error("Failed to run session", e);
                     }
-                } catch (Exception e) {
-                    log.error("Failed to run session", e);
                 }
-            }
-        };
-        thread.start();
+            });
         return authSocket;
     }
 
     @Override
     public void close() throws IOException {
-        agent.close();
+        IOException err = null;
+        try {
+            agent.close();
+        } catch(IOException e) {
+            err = e;
+        }
+
         Socket.close(handle);
+        
+        try {
+            if ((agentThread != null) && (!agentThread.isDone())) {
+                agentThread.cancel(true);
+            }
+        } finally {
+            agentThread = null;
+        }
+        
+        ExecutorService executor = getExecutorService();
+        if ((executor != null) && isShutdownOnExit() && 
(!executor.isShutdown())) {
+            Collection<?> runners = executor.shutdownNow();
+            if (log.isDebugEnabled()) {
+                log.debug("Shut down runners count=" + 
GenericUtils.size(runners));
+            }
+        }
+        
+        if (err != null) {
+            throw err;
+        }
     }
 
     protected static class SshAgentSession extends AbstractAgentClient 
implements Runnable {
@@ -101,7 +152,6 @@ public class AgentServer extends AbstractLoggingBean 
implements Closeable {
         public SshAgentSession(long socket, SshAgent agent) {
             super(agent);
             this.socket = socket;
-            new Thread(this).start();
         }
 
         @SuppressWarnings("synthetic-access")

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
index aa40241..ab84cee 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
@@ -20,13 +20,21 @@ package org.apache.sshd.agent.unix;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.Channel;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.agent.SshAgentServer;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.AbstractLoggingBean;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.OsUtils;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.tomcat.jni.Local;
 import org.apache.tomcat.jni.Pool;
 import org.apache.tomcat.jni.Socket;
@@ -35,19 +43,25 @@ import org.apache.tomcat.jni.Status;
 /**
  * The server side fake agent, acting as an agent, but actually forwarding the 
requests to the auth channel on the client side.
  */
-public class AgentServerProxy extends AbstractLoggingBean implements 
SshAgentServer {
+public class AgentServerProxy extends AbstractLoggingBean implements 
SshAgentServer, ExecutorServiceCarrier, Channel {
     private final ConnectionService service;
     private final String authSocket;
     private final long pool;
     private final long handle;
-    private final Thread thread;
-    private volatile boolean closed;
-    private volatile boolean innerFinished;
+    private Future<?> piper;
+    private final ExecutorService pipeService;
+    private final boolean pipeCloseOnExit;
+    private final AtomicBoolean open = new AtomicBoolean(true);
+    private final AtomicBoolean innerFinished = new AtomicBoolean(false);
 
     //used to wake the Local.listen() JNI call
     private static final byte[] END_OF_STREAM_MESSAGE = new byte[] { 
"END_OF_STREAM".getBytes()[0] };
 
     public AgentServerProxy(ConnectionService service) throws IOException {
+        this(service, null, false);
+    }
+
+    public AgentServerProxy(ConnectionService service, ExecutorService 
executor, boolean shutdownOnExit) throws IOException {
         this.service = service;
         try {
             String authSocket = AprLibrary.createLocalSocketAddress();
@@ -66,40 +80,41 @@ public class AgentServerProxy extends AbstractLoggingBean 
implements SshAgentSer
             if (result != Status.APR_SUCCESS) {
                 throwException(result);
             }
-            thread = new Thread("sshd-AgentServerProxy-PIPE-" + authSocket) {
-                @SuppressWarnings("synthetic-access")
-                @Override
-                public void run() {
-                    try {
-                        while (!closed) {
-                            try {
-                                long clientSock = Local.accept(handle);
-                                if (closed) {
-                                    break;
-                                }
-                                Socket.timeoutSet(clientSock, 10000000);
-                                AgentForwardedChannel channel = new 
AgentForwardedChannel(clientSock);
-                                
AgentServerProxy.this.service.registerChannel(channel);
-                                OpenFuture future = channel.open().await();
-                                Throwable t = future.getException();
-                                if (t instanceof Exception) {
-                                    throw (Exception) t;
-                                } else if (t != null) {
-                                    throw new Exception(t);
-                                }
-                            } catch (Exception e) {
-                                if (!closed) {
-                                    log.info("Exchange caught in 
authentication forwarding", e);
+            
+            pipeService = (executor == null) ? 
ThreadUtils.newSingleThreadExecutor("sshd-AgentServerProxy-PIPE-" + authSocket) 
: executor;
+            pipeCloseOnExit = (executor == pipeService) ? shutdownOnExit : 
true;
+            piper = pipeService.submit(new Runnable() {
+                    @SuppressWarnings("synthetic-access")
+                    @Override
+                    public void run() {
+                        try {
+                            while (isOpen()) {
+                                try {
+                                    long clientSock = Local.accept(handle);
+                                    if (!isOpen()) {
+                                        break;
+                                    }
+                                    Socket.timeoutSet(clientSock, 10000000);   
 // TODO allow to configure this
+                                    AgentForwardedChannel channel = new 
AgentForwardedChannel(clientSock);
+                                    
AgentServerProxy.this.service.registerChannel(channel);
+                                    OpenFuture future = channel.open().await();
+                                    Throwable t = future.getException();
+                                    if (t instanceof Exception) {
+                                        throw (Exception) t;
+                                    } else if (t != null) {
+                                        throw new Exception(t);
+                                    }
+                                } catch (Exception e) {
+                                    if (isOpen()) {
+                                        log.info(e.getClass().getSimpleName() 
+ " while authentication forwarding: " + e.getMessage(), e);
+                                    }
                                 }
                             }
+                        } finally {
+                            innerFinished.set(true);
                         }
-                    } finally {
-                        innerFinished = true;
                     }
-                }
-            };
-            thread.setDaemon(true);
-            thread.start();
+                });
         } catch (IOException e) {
             throw e;
         } catch (Exception e) {
@@ -108,25 +123,40 @@ public class AgentServerProxy extends AbstractLoggingBean 
implements SshAgentSer
     }
 
     @Override
+    public boolean isOpen() {
+        return open.get();
+    }
+
+    @Override
+    public ExecutorService getExecutorService() {
+        return pipeService;
+    }
+
+    @Override
+    public boolean isShutdownOnExit() {
+        return pipeCloseOnExit;
+    }
+
+    @Override
     public String getId() {
         return authSocket;
     }
 
     @Override
     public synchronized void close() throws IOException {
-        if (closed) {
-            return;
+        if (!open.getAndSet(false)) {
+            return; // already closed (or closing)
         }
-        closed = true;
+
         final boolean isDebug = log.isDebugEnabled();
 
         if (handle != 0) {
-            if (!innerFinished) {
+            if (!innerFinished.get()) {
                 try {
 
                     final long tmpPool = 
Pool.create(AprLibrary.getInstance().getRootPool());
                     final long tmpSocket = Local.create(authSocket, tmpPool);
-                    long connectResult = Local.connect(tmpSocket, 0);
+                    long connectResult = Local.connect(tmpSocket, 0L);
 
                     if (connectResult != Status.APR_SUCCESS) {
                         if (isDebug) {
@@ -157,7 +187,6 @@ public class AgentServerProxy extends AbstractLoggingBean 
implements SshAgentSer
 
         try {
             if (authSocket != null) {
-
                 final File socketFile = new File(authSocket);
                 if (socketFile.exists()) {
                     if (socketFile.delete()) {
@@ -182,6 +211,22 @@ public class AgentServerProxy extends AbstractLoggingBean 
implements SshAgentSer
                 log.debug("Exception deleting the PIPE socket: " + authSocket, 
e);
             }
         }
+        
+        try {
+            if ((piper != null) && (!piper.isDone())) {
+                piper.cancel(true);
+            }
+        } finally {
+            piper = null;
+        }
+        
+        ExecutorService executor = getExecutorService();
+        if ((executor != null) && isShutdownOnExit() && 
(!executor.isShutdown())) {
+            Collection<?>   runners = executor.shutdownNow();
+            if (log.isDebugEnabled()) {
+                log.debug("Shut down runners count=" + 
GenericUtils.size(runners));
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
index 1913652..5f7cb15 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
@@ -20,6 +20,9 @@ package org.apache.sshd.agent.unix;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.client.future.DefaultOpenFuture;
@@ -31,7 +34,10 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.channel.AbstractServerChannel;
 import org.apache.tomcat.jni.Local;
 import org.apache.tomcat.jni.Pool;
@@ -43,7 +49,7 @@ import org.apache.tomcat.jni.Status;
  */
 public class ChannelAgentForwarding extends AbstractServerChannel {
 
-    public static class ChannelAgentForwardingFactory implements 
NamedFactory<Channel> {
+    public static class ChannelAgentForwardingFactory implements 
NamedFactory<Channel>, ExecutorServiceCarrier {
         public static final ChannelAgentForwardingFactory INSTANCE = new 
ChannelAgentForwardingFactory();
         
         public ChannelAgentForwardingFactory() {
@@ -55,19 +61,35 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
             return "auth-ag...@openssh.com";
         }
 
+        @Override   // user can override to provide an alternative
+        public ExecutorService getExecutorService() {
+            return null;
+        }
+
+        @Override
+        public boolean isShutdownOnExit() {
+            return false;
+        }
+
         @Override
         public Channel create() {
-            return new ChannelAgentForwarding();
+            ChannelAgentForwarding  channel = new ChannelAgentForwarding();
+            channel.setExecutorService(getExecutorService());
+            channel.setShutdownOnExit(isShutdownOnExit());
+            return channel;
         }
     }
 
     private String authSocket;
     private long pool;
     private long handle;
-    private Thread thread;
     private OutputStream out;
+    private ExecutorService forwardService;
+    private Future<?> forwarder;
+    private boolean shutdownForwarder;
 
     public ChannelAgentForwarding() {
+        super();
     }
 
     @Override
@@ -82,25 +104,28 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
             if (result != Status.APR_SUCCESS) {
                 throwException(result);
             }
-            thread = new Thread() {
-                @SuppressWarnings("synthetic-access")
-                @Override
-                public void run() {
-                    try {
-                        byte[] buf = new byte[1024];
-                        while (true) {
-                            int len = Socket.recv(handle, buf, 0, buf.length);
-                            if (len > 0) {
-                                out.write(buf, 0, len);
-                                out.flush();
+            
+            ExecutorService service = getExecutorService();
+            forwardService = (service == null) ? 
ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + 
"]") : service;
+            shutdownForwarder = (service == forwardService) ? 
isShutdownOnExit() : true;
+            forwarder = forwardService.submit(new Runnable() {
+                    @SuppressWarnings("synthetic-access")
+                    @Override
+                    public void run() {
+                        try {
+                            byte[] buf = new byte[1024];
+                            while (true) {
+                                int len = Socket.recv(handle, buf, 0, 
buf.length);
+                                if (len > 0) {
+                                    out.write(buf, 0, len);
+                                    out.flush();
+                                }
                             }
+                        } catch (IOException e) {
+                            close(true);
                         }
-                    } catch (IOException e) {
-                        close(true);
                     }
-                }
-            };
-            thread.start();
+                });
             f.setOpened();
 
         } catch (Exception e) {
@@ -118,8 +143,27 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
         super.close(true);
 
         // We also need to close the socket.
-        //
         Socket.close(handle);
+        
+        try {
+            if ((forwarder != null) && (!forwarder.isDone())) {
+                forwarder.cancel(true);
+            }
+        } finally {
+            forwarder = null;
+        }
+        
+        try {
+            if  ((forwardService != null) && shutdownForwarder) {
+                Collection<?> runners = forwardService.shutdownNow();
+                if (log.isDebugEnabled()) {
+                    log.debug("Shut down runners count=" + 
GenericUtils.size(runners));
+                }
+            }
+        } finally {
+            forwardService = null;
+            shutdownForwarder = false;
+        }
     }
 
     @Override
@@ -157,9 +201,7 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
      * @param code APR error code
      * @throws java.io.IOException the produced exception for the given APR 
error number
      */
-    private void throwException(int code) throws IOException {
-        throw new IOException(
-                org.apache.tomcat.jni.Error.strerror(-code) +
-                " (code: " + code + ")");
+    private static void throwException(int code) throws IOException {
+        throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " 
(code: " + code + ")");
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
index 17e8160..61d5a28 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
@@ -19,6 +19,7 @@
 package org.apache.sshd.agent.unix;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.agent.SshAgentFactory;
@@ -31,13 +32,57 @@ import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
 import org.apache.sshd.server.session.ServerSession;
 
-public class UnixAgentFactory implements SshAgentFactory {
+public class UnixAgentFactory implements SshAgentFactory, 
ExecutorServiceConfigurer {
+    private ExecutorService executor;
+    private boolean shutdownExecutor;
+    private final NamedFactory<Channel> factory = new 
ChannelAgentForwarding.ChannelAgentForwardingFactory() {
+        @Override
+        public ExecutorService getExecutorService() {
+            return UnixAgentFactory.this.getExecutorService();
+        }
+
+        @Override
+        public boolean isShutdownOnExit() {
+            return UnixAgentFactory.this.isShutdownOnExit();
+        }
+        
+    };
+
+    public UnixAgentFactory() {
+        super();
+    }
+
+    public UnixAgentFactory(ExecutorService service, boolean shutdown) {
+        executor = service;
+        shutdownExecutor = shutdown;
+    }
+
+    @Override
+    public ExecutorService getExecutorService() {
+        return executor;
+    }
+
+    @Override
+    public void setExecutorService(ExecutorService service) {
+        executor = service;
+    }
+
+    @Override
+    public boolean isShutdownOnExit() {
+        return shutdownExecutor;
+    }
+
+    @Override
+    public void setShutdownOnExit(boolean shutdown) {
+        shutdownExecutor = shutdown;
+    }
 
     @Override
     public NamedFactory<Channel> getChannelForwardingFactory() {
-        return ChannelAgentForwarding.ChannelAgentForwardingFactory.INSTANCE;
+        return factory;
     }
 
     @Override
@@ -46,7 +91,8 @@ public class UnixAgentFactory implements SshAgentFactory {
         if (GenericUtils.isEmpty(authSocket)) {
             throw new SshException("No " + SshAgent.SSH_AUTHSOCKET_ENV_NAME + 
" value");
         }
-        return new AgentClient(authSocket);
+
+        return new AgentClient(authSocket, getExecutorService(), 
isShutdownOnExit());
     }
 
     @Override
@@ -55,6 +101,6 @@ public class UnixAgentFactory implements SshAgentFactory {
         if (!(session instanceof ServerSession)) {
             throw new IllegalStateException("The session used to create an 
agent server proxy must be a server session");
         }
-        return new AgentServerProxy(service);
+        return new AgentServerProxy(service, getExecutorService(), 
isShutdownOnExit());
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 1f434b0..e690667 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -212,8 +212,7 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
                 }
                 try {
                     if (log.isTraceEnabled()) {
-                        log.trace("Waiting for lock on channel {}, mask={}, 
cond={}",
-                                  new Object[] { this, Integer.valueOf(mask), 
Integer.valueOf(cond) });
+                        log.trace("Waiting for lock on channel {}, mask={}, 
cond={}", this, Integer.valueOf(mask), Integer.valueOf(cond));
                     }
                     if (timeout > 0) {
                         lock.wait(timeout);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index 91d6153..102e85f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -20,14 +20,17 @@ package org.apache.sshd.client.channel;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.ChannelPipedInputStream;
 import org.apache.sshd.common.channel.ChannelPipedOutputStream;
-import org.apache.sshd.common.channel.ChannelAsyncInputStream;
-import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.util.threads.ThreadUtils;
 
 /**
  * TODO Add javadoc
@@ -35,8 +38,9 @@ import org.apache.sshd.common.future.CloseFuture;
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
 public class ChannelSession extends AbstractClientChannel {
-
-    private Thread streamPumper;
+    private ExecutorService pumperService;
+    private Future<?> pumper;
+    private boolean shutdownPumper;
 
     public ChannelSession() {
         super("session");
@@ -73,28 +77,50 @@ public class ChannelSession extends AbstractClientChannel {
                 err = pos;
                 invertedErr = pis;
             }
+
             if (in != null) {
-                streamPumper = new Thread("ClientInputStreamPump") {
-                    @Override
-                    public void run() {
-                        pumpInputStream();
-                    }
-                };
+                // allocate a temporary executor service if none provided
+                ExecutorService service = getExecutorService();
+                if ((pumperService = service) == null) {
+                    pumperService = 
ThreadUtils.newSingleThreadExecutor("ClientInputStreamPump[" + this.toString() 
+ "]");
+                }
+                
+                // shutdown the temporary executor service if had to create it
+                shutdownPumper = (pumperService == service) ? 
isShutdownOnExit() : true;
+
                 // Interrupt does not really work and the thread will only 
exit when
                 // the call to read() will return.  So ensure this thread is a 
daemon
                 // to avoid blocking the whole app
-                streamPumper.setDaemon(true);
-                streamPumper.start();
+                pumper = pumperService.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            pumpInputStream();
+                        }
+                    });
             }
         }
     }
 
     @Override
     protected void doCloseImmediately() {
-        if (streamPumper != null) {
-            streamPumper.interrupt();
-            streamPumper = null;
+        if ((pumper != null) && (pumperService != null) && shutdownPumper && 
(!pumperService.isShutdown())) {
+            try {
+                if (!pumper.isDone()) {
+                    pumper.cancel(true);
+                }
+                
+                pumperService.shutdownNow();
+            } catch(Exception e) {
+                // we log it as DEBUG since it is relatively harmless
+                if (log.isDebugEnabled()) {
+                    log.debug("Failed (" + e.getClass().getSimpleName() + ") 
to shutdown stream pumper: " + e.getMessage());
+                }
+            } finally {
+                pumper = null;
+                pumperService = null;
+            }
         }
+
         super.doCloseImmediately();
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java
index 9250847..73b132a 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 
 /**
@@ -35,10 +37,7 @@ public class ChannelSubsystem extends ChannelSession {
     private final String subsystem;
 
     public ChannelSubsystem(String subsystem) {
-        if (subsystem == null) {
-            throw new IllegalArgumentException("subsystem must not be null");
-        }
-        this.subsystem = subsystem;
+        this.subsystem = ValidateUtils.checkNotNullAndNotEmpty(subsystem, 
"Subsystem may not be null/empty", GenericUtils.EMPTY_OBJECT_ARRAY);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 6387cac..6abe007 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -20,7 +20,9 @@ package org.apache.sshd.common.channel;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -37,16 +39,20 @@ import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
 
 /**
  * TODO Add javadoc
  *
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
-public abstract class AbstractChannel extends 
CloseableUtils.AbstractInnerCloseable implements Channel {
+public abstract class AbstractChannel
+                extends CloseableUtils.AbstractInnerCloseable
+                implements Channel, ExecutorServiceConfigurer {
 
     public static final int DEFAULT_WINDOW_SIZE = 0x200000;
     public static final int DEFAULT_PACKET_SIZE = 0x8000;
@@ -57,6 +63,8 @@ public abstract class AbstractChannel extends 
CloseableUtils.AbstractInnerClosea
         Opened, CloseSent, CloseReceived, Closed
     }
 
+    private ExecutorService executor;
+    private boolean shutdownExecutor;
     protected final Window localWindow = new Window(this, null, 
getClass().getName().contains(".client."), true);
     protected final Window remoteWindow = new Window(this, null, 
getClass().getName().contains(".client."), false);
     protected ConnectionService service;
@@ -106,6 +114,26 @@ public abstract class AbstractChannel extends 
CloseableUtils.AbstractInnerClosea
     }
 
     @Override
+    public ExecutorService getExecutorService() {
+        return executor;
+    }
+
+    @Override
+    public void setExecutorService(ExecutorService service) {
+        executor = service;
+    }
+
+    @Override
+    public boolean isShutdownOnExit() {
+        return shutdownExecutor;
+    }
+
+    @Override
+    public void setShutdownOnExit(boolean shutdown) {
+        shutdownExecutor = shutdown;
+    }
+
+    @Override
     public void handleRequest(Buffer buffer) throws IOException {
         String req = buffer.getString();
         boolean wantReply = buffer.getBoolean();
@@ -189,10 +217,12 @@ public abstract class AbstractChannel extends 
CloseableUtils.AbstractInnerClosea
         public boolean isClosing() {
             return closing;
         }
+
         @Override
         public boolean isClosed() {
             return gracefulFuture.isClosed();
         }
+
         @Override
         public CloseFuture close(boolean immediately) {
             closing = true;
@@ -226,6 +256,15 @@ public abstract class AbstractChannel extends 
CloseableUtils.AbstractInnerClosea
                     AbstractChannel.this.close(true);
                 }
             }
+            
+            ExecutorService service = getExecutorService();
+            if ((service != null) && isShutdownOnExit() && 
(!service.isShutdown())) {
+                Collection<?>   running = service.shutdownNow();
+                if (log.isDebugEnabled()) {
+                    log.debug("Shutdown executor service on close - running 
count=" + GenericUtils.size(running));
+                }
+            }
+
             return gracefulFuture;
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
index 21e60df..255bfe3 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
@@ -21,6 +21,8 @@ package org.apache.sshd.common.forward;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.ConnectException;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
@@ -40,6 +42,8 @@ import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.channel.AbstractServerChannel;
 import org.apache.sshd.server.channel.OpenChannelException;
 
@@ -49,7 +53,7 @@ import org.apache.sshd.server.channel.OpenChannelException;
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
 public class TcpipServerChannel extends AbstractServerChannel {
-    public abstract static class TcpipFactory implements NamedFactory<Channel> 
{
+    public abstract static class TcpipFactory implements 
NamedFactory<Channel>, ExecutorServiceCarrier {
         private final Type type;
 
         protected TcpipFactory(Type type) {
@@ -60,9 +64,22 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
             return type;
         }
         
+        @Override   // user can override to provide an alternative
+        public ExecutorService getExecutorService() {
+            return null;
+        }
+
+        @Override
+        public boolean isShutdownOnExit() {
+            return false;
+        }
+
         @Override
         public Channel create() {
-            return new TcpipServerChannel(getType());
+            TcpipServerChannel  channel = new TcpipServerChannel(getType());
+            channel.setExecutorService(getExecutorService());
+            channel.setShutdownOnExit(isShutdownOnExit());
+            return channel;
         }
     }
 
@@ -132,7 +149,7 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
         }
 
         final ForwardingFilter filter = 
getSession().getFactoryManager().getTcpipForwardingFilter();
-        if (address == null || filter == null || !filter.canConnect(address, 
getSession())) {
+        if ((address == null) || (filter == null) || 
(!filter.canConnect(address, getSession()))) {
             super.close(true);
             f.setException(new 
OpenChannelException(SshConstants.SSH_OPEN_ADMINISTRATIVELY_PROHIBITED, 
"Connection denied"));
             return f;
@@ -205,16 +222,33 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
         // We also need to dispose of the connector, but unfortunately we
         // are being invoked by the connector thread or the connector's
         // own processor thread.  Disposing of the connector within either
-        // causes deadlock.  Instead create a new thread to dispose of the
+        // causes deadlock.  Instead create a thread to dispose of the
         // connector in the background.
-        //
-        new Thread("TcpIpServerChannel-ConnectorCleanup") {
-            @SuppressWarnings("synthetic-access")
-            @Override
-            public void run() {
-                connector.close(true);
-            }
-        }.start();
+
+        ExecutorService service = getExecutorService();
+        // allocate a temporary executor service if none provided
+        final ExecutorService executors = (service == null)
+                                        ? 
ThreadUtils.newSingleThreadExecutor("TcpIpServerChannel-ConnectorCleanup[" + 
getSession() + "]")
+                                        : service
+                                        ;
+        // shutdown the temporary executor service if had to create it
+        final boolean shutdown = (executors == service) ? isShutdownOnExit() : 
true;
+        executors.submit(new Runnable() {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                public void run() {
+                    try {
+                        connector.close(true);
+                    } finally {
+                        if ((executors != null) && (!executors.isShutdown()) 
&& shutdown) {
+                            Collection<Runnable> runners = 
executors.shutdownNow();
+                            if (log.isDebugEnabled()) {
+                                log.debug("destroy() - shutdown executor 
service - runners count=" + ((runners == null) ? 0 : runners.size()));
+                            }
+                        }
+                    }
+                }
+            });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java 
b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
index cffa5c2..5b79870 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
@@ -90,22 +90,23 @@ public class ThreadUtils {
     /**
      * Attempts to find the most suitable {@link ClassLoader} as follows:</BR>
      * <UL>
-     * <LI>
-     * Check the {@link Thread#getContextClassLoader()} value
-     * </LI>
-     * <p/>
-     * <LI>
-     * If no thread context class loader then check the anchor
-     * class (if given) for its class loader
-     * </LI>
-     * <p/>
-     * <LI>
-     * If still no loader available, then use {@link 
ClassLoader#getSystemClassLoader()}
-     * </LI>
+     *      <LI>
+     *      Check the {@link Thread#getContextClassLoader()} value
+     *      </LI>
+     *      
+     *      <LI>
+     *      If no thread context class loader then check the anchor
+     *      class (if given) for its class loader
+     *      </LI>
+     *      
+     *      <LI>
+     *      If still no loader available, then use {@link 
ClassLoader#getSystemClassLoader()}
+     *      </LI>
      * </UL>
-     *
-     * @param anchor
-     * @return
+     * @param anchor The anchor {@link Class} to use if no current thread
+     * - ignored if {@code null}
+     * context class loader
+     * @return The resolver {@link ClassLoader}
      */
     public static ClassLoader resolveDefaultClassLoader(Class<?> anchor) {
         Thread thread = Thread.currentThread();
@@ -125,38 +126,27 @@ public class ThreadUtils {
         return cl;
     }
 
-    public static ExecutorService newFixedThreadPool(
-            String poolName,
-            int nThreads
-    ) {
+    public static ExecutorService newFixedThreadPool(String poolName, int 
nThreads) {
         return new ThreadPoolExecutor(nThreads, nThreads,
-                0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>(),
-                new SshdThreadFactory(poolName),
-                new ThreadPoolExecutor.CallerRunsPolicy());
+                                      0L, TimeUnit.MILLISECONDS,
+                                      new LinkedBlockingQueue<Runnable>(),
+                                      new SshdThreadFactory(poolName),
+                                      new 
ThreadPoolExecutor.CallerRunsPolicy());
     }
 
-    public static ExecutorService newCachedThreadPool(
-            String poolName
-    ) {
+    public static ExecutorService newCachedThreadPool(String poolName) {
         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
-                60L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>(),
-                new SshdThreadFactory(poolName),
-                new ThreadPoolExecutor.CallerRunsPolicy());
+                                      60L, TimeUnit.SECONDS,
+                                      new SynchronousQueue<Runnable>(),
+                                      new SshdThreadFactory(poolName),
+                                      new 
ThreadPoolExecutor.CallerRunsPolicy());
     }
 
-    public static ScheduledExecutorService newSingleThreadScheduledExecutor(
-            String poolName
-    ) {
-        return new ScheduledThreadPoolExecutor(
-                1,
-                new SshdThreadFactory(poolName));
+    public static ScheduledExecutorService 
newSingleThreadScheduledExecutor(String poolName) {
+        return new ScheduledThreadPoolExecutor(1, new 
SshdThreadFactory(poolName));
     }
 
-    public static ExecutorService newSingleThreadExecutor(
-            String poolName
-    ) {
+    public static ExecutorService newSingleThreadExecutor(String poolName) {
         return newFixedThreadPool(poolName, 1);
     }
 
@@ -168,18 +158,21 @@ public class ThreadUtils {
 
         public SshdThreadFactory(String name) {
             SecurityManager s = System.getSecurityManager();
-            group = (s != null) ? s.getThreadGroup() :
-                    Thread.currentThread().getThreadGroup();
-            namePrefix = "sshd-" + name + "-thread-";
+            ThreadGroup parentGroup = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
+            String effectiveName = name.replace(' ', '-');
+            group = new ThreadGroup(parentGroup, "sshd-" + effectiveName + 
"-group");
+            namePrefix = "sshd-" + effectiveName + "-thread-";
         }
 
         @Override
         public Thread newThread(Runnable r) {
             Thread t = new Thread(group, r, namePrefix + 
threadNumber.getAndIncrement(), 0);
-            if (t.isDaemon())
-                t.setDaemon(false);
-            if (t.getPriority() != Thread.NORM_PRIORITY)
+            if (!t.isDaemon()) {
+                t.setDaemon(true);
+            }
+            if (t.getPriority() != Thread.NORM_PRIORITY) {
                 t.setPriority(Thread.NORM_PRIORITY);
+            }
             return t;
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java 
b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java
index 2cd4638..baf6ca2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java
@@ -194,7 +194,7 @@ public class ScpCommand extends AbstractLoggingBean 
implements Command, Runnable
 
         pendingFuture = null;
 
-        if ((executors != null) && shutdownExecutor) {
+        if ((executors != null) && (!executors.isShutdown()) && 
shutdownExecutor) {
             Collection<Runnable> runners = executors.shutdownNow();
             if (log.isDebugEnabled()) {
                 log.debug("destroy() - shutdown executor service - runners 
count=" + ((runners == null) ? 0 : runners.size()));

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java 
b/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java
index b57705d..ecc8d29 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java
@@ -2139,7 +2139,7 @@ public class SftpSubsystem extends AbstractLoggingBean 
implements Command, Runna
 
             pendingFuture = null;
 
-            if ((executors != null) && shutdownExecutor) {
+            if ((executors != null) && (!executors.isShutdown()) && 
shutdownExecutor) {
                 Collection<Runnable> runners = executors.shutdownNow();
                 if (log.isDebugEnabled()) {
                     log.debug("destroy() - shutdown executor service - runners 
count=" + ((runners == null) ? 0 : runners.size()));

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/test/java/org/apache/sshd/AgentTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/AgentTest.java 
b/sshd-core/src/test/java/org/apache/sshd/AgentTest.java
index d21828e..d7e4fbc 100644
--- a/sshd-core/src/test/java/org/apache/sshd/AgentTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/AgentTest.java
@@ -19,8 +19,6 @@
 package org.apache.sshd;
 
 import static org.apache.sshd.util.Utils.createTestKeyPairProvider;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assume.assumeThat;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -49,11 +47,14 @@ import org.junit.Assume;
 import org.junit.Test;
 
 public class AgentTest extends BaseTestSupport {
+    public AgentTest() {
+        super();
+    }
 
     @Test
-    public void testAgent() throws Exception {
+    public void testAgentServer() throws Exception {
         // TODO: revisit this test to work without BC
-        Assume.assumeTrue("BoncyCastle not registered", 
SecurityUtils.isBouncyCastleRegistered());
+        Assume.assumeTrue("BouncyCastle not registered", 
SecurityUtils.isBouncyCastleRegistered());
 
         try(AgentServer agent = new AgentServer()) {
             String authSocket;
@@ -63,23 +64,23 @@ public class AgentTest extends BaseTestSupport {
                 // the native library is not available, so these tests should 
be skipped
                 authSocket = null;
             }
-            assumeThat(authSocket, notNullValue());
+            Assume.assumeTrue("Native library N/A", authSocket != null);
     
             try(SshAgent client = new AgentClient(authSocket)) {
                 List<SshAgent.Pair<PublicKey, String>> keys = 
client.getIdentities();
-                assertNotNull(keys);
-                assertEquals(0, keys.size());
+                assertNotNull("No initial identities", keys);
+                assertEquals("Unexpected initial identities size", 0, 
keys.size());
         
                 KeyPair k = 
Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
                 client.addIdentity(k, "");
                 keys = client.getIdentities();
-                assertNotNull(keys);
-                assertEquals(1, keys.size());
+                assertNotNull("No registered identities after add", keys);
+                assertEquals("Mismatched registered keys size", 1, 
keys.size());
         
                 client.removeIdentity(k.getPublic());
                 keys = client.getIdentities();
-                assertNotNull(keys);
-                assertEquals(0, keys.size());
+                assertNotNull("No registered identities after remove", keys);
+                assertEquals("Registered keys size not empty", 0, keys.size());
         
                 client.removeAllIdentities();
             }    
@@ -89,14 +90,14 @@ public class AgentTest extends BaseTestSupport {
     @Test
     public void testAgentForwarding() throws Exception {
         // TODO: revisit this test to work without BC
-        Assume.assumeTrue("BoncyCastle not registered", 
SecurityUtils.isBouncyCastleRegistered());
+        Assume.assumeTrue("BouncyCastle not registered", 
SecurityUtils.isBouncyCastleRegistered());
 
         TestEchoShellFactory shellFactory = new TestEchoShellFactory();
         ProxyAgentFactory agentFactory = new ProxyAgentFactory();
         LocalAgentFactory localAgentFactory = new LocalAgentFactory();
-
+        String username = getCurrentTestName();
         KeyPair pair = 
createTestKeyPairProvider("dsaprivkey.pem").loadKey(KeyPairProvider.SSH_DSS);
-        localAgentFactory.getAgent().addIdentity(pair, "smx");
+        localAgentFactory.getAgent().addIdentity(pair, username);
 
         try(SshServer sshd1 = SshServer.setUpDefaultServer()) {
             sshd1.setKeyPairProvider(Utils.createTestHostKeyProvider());
@@ -120,8 +121,8 @@ public class AgentTest extends BaseTestSupport {
                     client1.setAgentFactory(localAgentFactory);
                     client1.start();
                     
-                    try(ClientSession session1 = client1.connect("smx", 
"localhost", port1).await().getSession()) {
-                        session1.auth().verify(5L, TimeUnit.SECONDS);
+                    try(ClientSession session1 = client1.connect(username, 
"localhost", port1).await().getSession()) {
+                        session1.auth().verify(10L, TimeUnit.SECONDS);
 
                         try(ChannelShell channel1 = 
session1.createShellChannel();
                             ByteArrayOutputStream out = new 
ByteArrayOutputStream();
@@ -145,7 +146,7 @@ public class AgentTest extends BaseTestSupport {
                                     
client2.getProperties().putAll(shellFactory.shell.getEnvironment().getEnv());
                                     client2.start();
                                     
-                                    try(ClientSession session2 = 
client2.connect("smx", "localhost", port2).await().getSession()) {
+                                    try(ClientSession session2 = 
client2.connect(username, "localhost", port2).await().getSession()) {
                                         session2.auth().verify(5L, 
TimeUnit.SECONDS);
 
                                         try(ChannelShell channel2 = 
session2.createShellChannel()) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java
----------------------------------------------------------------------
diff --git 
a/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java 
b/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java
index d8b4c80..6f09d34 100644
--- a/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java
+++ b/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java
@@ -54,10 +54,12 @@ public class GitPackCommand implements Command, Runnable {
         this.command = command;
     }
 
+    @Override
     public void setInputStream(InputStream in) {
         this.in = in;
     }
 
+    @Override
     public void setOutputStream(OutputStream out) {
         this.out = out;
         if (out instanceof ChannelOutputStream) {
@@ -65,6 +67,7 @@ public class GitPackCommand implements Command, Runnable {
         }
     }
 
+    @Override
     public void setErrorStream(OutputStream err) {
         this.err = err;
         if (err instanceof ChannelOutputStream) {
@@ -72,14 +75,19 @@ public class GitPackCommand implements Command, Runnable {
         }
     }
 
+    @Override
     public void setExitCallback(ExitCallback callback) {
         this.callback = callback;
     }
 
+    @Override
     public void start(Environment env) throws IOException {
-        new Thread(this).start();
+        Thread  thread=new Thread(this);
+        thread.setDaemon(true);
+        thread.start();
     }
 
+    @Override
     public void run() {
         try {
             List<String> strs = parseDelimitedString(command, " ", true);
@@ -114,6 +122,7 @@ public class GitPackCommand implements Command, Runnable {
         }
     }
 
+    @Override
     public void destroy() {
         //To change body of implemented methods use File | Settings | File 
Templates.
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java
----------------------------------------------------------------------
diff --git a/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java 
b/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java
index 22a2f11..210a7cf 100644
--- a/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java
+++ b/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java
@@ -48,10 +48,12 @@ public class GitPgmCommand implements Command, Runnable {
         this.command = command;
     }
 
+    @Override
     public void setInputStream(InputStream in) {
         this.in = in;
     }
 
+    @Override
     public void setOutputStream(OutputStream out) {
         this.out = out;
         if (out instanceof ChannelOutputStream) {
@@ -59,6 +61,7 @@ public class GitPgmCommand implements Command, Runnable {
         }
     }
 
+    @Override
     public void setErrorStream(OutputStream err) {
         this.err = err;
         if (err instanceof ChannelOutputStream) {
@@ -66,14 +69,19 @@ public class GitPgmCommand implements Command, Runnable {
         }
     }
 
+    @Override
     public void setExitCallback(ExitCallback callback) {
         this.callback = callback;
     }
 
+    @Override
     public void start(Environment env) throws IOException {
-        new Thread(this).start();
+        Thread  thread=new Thread(this);
+        thread.setDaemon(true);
+        thread.start();
     }
 
+    @Override
     public void run() {
         try {
             List<String> strs = parseDelimitedString(command, " ", true);
@@ -104,6 +112,7 @@ public class GitPgmCommand implements Command, Runnable {
         }
     }
 
+    @Override
     public void destroy() {
         //To change body of implemented methods use File | Settings | File 
Templates.
     }

Reply via email to