Repository: mina-sshd Updated Branches: refs/heads/master f8d842b1e -> 330d17c81
[SSHD-480] Replace all "new Thread(...)" calls with "ExecutorService#submit" Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/009d832d Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/009d832d Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/009d832d Branch: refs/heads/master Commit: 009d832d4a1316bcfc1e2921c386d078dcb3a3ce Parents: f8d842b Author: Lyor Goldstein <lgoldst...@vmware.com> Authored: Mon Jun 1 16:01:21 2015 +0300 Committer: Lyor Goldstein <lgoldst...@vmware.com> Committed: Mon Jun 1 16:01:21 2015 +0300 ---------------------------------------------------------------------- src/tomcat-apr-5.5.23-sources.jar | Bin 0 -> 25227 bytes .../main/java/org/apache/sshd/SshClient.java | 9 +- .../main/java/org/apache/sshd/SshServer.java | 9 +- .../java/org/apache/sshd/agent/SshAgent.java | 3 +- .../sshd/agent/common/AbstractAgentClient.java | 28 ++--- .../sshd/agent/common/AbstractAgentProxy.java | 96 +++++++++++--- .../apache/sshd/agent/common/AgentDelegate.java | 5 + .../sshd/agent/local/AgentForwardedChannel.java | 15 ++- .../org/apache/sshd/agent/local/AgentImpl.java | 32 +++-- .../org/apache/sshd/agent/unix/AgentClient.java | 45 +++++-- .../org/apache/sshd/agent/unix/AgentServer.java | 90 ++++++++++--- .../sshd/agent/unix/AgentServerProxy.java | 125 +++++++++++++------ .../sshd/agent/unix/ChannelAgentForwarding.java | 90 +++++++++---- .../sshd/agent/unix/UnixAgentFactory.java | 54 +++++++- .../client/channel/AbstractClientChannel.java | 3 +- .../sshd/client/channel/ChannelSession.java | 56 ++++++--- .../sshd/client/channel/ChannelSubsystem.java | 7 +- .../sshd/common/channel/AbstractChannel.java | 41 +++++- .../sshd/common/forward/TcpipServerChannel.java | 58 +++++++-- .../sshd/common/util/threads/ThreadUtils.java | 83 ++++++------ .../apache/sshd/server/command/ScpCommand.java | 2 +- .../apache/sshd/server/sftp/SftpSubsystem.java | 2 +- .../test/java/org/apache/sshd/AgentTest.java | 35 +++--- .../apache/sshd/git/pack/GitPackCommand.java | 11 +- .../org/apache/sshd/git/pgm/GitPgmCommand.java | 11 +- 25 files changed, 665 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/src/tomcat-apr-5.5.23-sources.jar ---------------------------------------------------------------------- diff --git a/src/tomcat-apr-5.5.23-sources.jar b/src/tomcat-apr-5.5.23-sources.jar new file mode 100644 index 0000000..5f6fd04 Binary files /dev/null and b/src/tomcat-apr-5.5.23-sources.jar differ http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/SshClient.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/SshClient.java index 4f74d61..7e13195 100644 --- a/sshd-core/src/main/java/org/apache/sshd/SshClient.java +++ b/sshd-core/src/main/java/org/apache/sshd/SshClient.java @@ -240,9 +240,12 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa public void run() { connector = null; ioServiceFactory = null; - if (shutdownExecutor && executor != null) { - executor.shutdownNow(); - executor = null; + if (shutdownExecutor && (executor != null) && (!executor.isShutdown())) { + try { + executor.shutdownNow(); + } finally { + executor = null; + } } } }) http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/SshServer.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/SshServer.java index 87ce889..024fc76 100644 --- a/sshd-core/src/main/java/org/apache/sshd/SshServer.java +++ b/sshd-core/src/main/java/org/apache/sshd/SshServer.java @@ -323,9 +323,12 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa public void run() { acceptor = null; ioServiceFactory = null; - if (shutdownExecutor && executor != null) { - executor.shutdownNow(); - executor = null; + if (shutdownExecutor && (executor != null) && (!executor.isShutdown())) { + try { + executor.shutdownNow(); + } finally { + executor = null; + } } } }) http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java b/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java index b3b2ff5..4303721 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/SshAgent.java @@ -18,7 +18,6 @@ */ package org.apache.sshd.agent; -import java.io.Closeable; import java.io.IOException; import java.security.KeyPair; import java.security.PublicKey; @@ -27,7 +26,7 @@ import java.util.List; /** * SSH key agent server */ -public interface SshAgent extends Closeable { +public interface SshAgent extends java.nio.channels.Channel { public static final String SSH_AUTHSOCKET_ENV_NAME = "SSH_AUTH_SOCK"; http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java index 44d6c23..9e48c7d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentClient.java @@ -36,6 +36,7 @@ import org.apache.sshd.agent.SshAgent; import org.apache.sshd.common.util.AbstractLoggingBean; import org.apache.sshd.common.util.KeyUtils; import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.buffer.BufferUtils; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; public abstract class AbstractAgentClient extends AbstractLoggingBean { @@ -43,7 +44,7 @@ public abstract class AbstractAgentClient extends AbstractLoggingBean { private final Buffer buffer = new ByteArrayBuffer(); private final SshAgent agent; - public AbstractAgentClient(SshAgent agent) { + protected AbstractAgentClient(SshAgent agent) { this.agent = agent; } @@ -76,8 +77,7 @@ public abstract class AbstractAgentClient extends AbstractLoggingBean { protected void process(Buffer req, Buffer rep) throws Exception { int cmd = req.getByte(); switch (cmd) { - case SSH2_AGENTC_REQUEST_IDENTITIES: - { + case SSH2_AGENTC_REQUEST_IDENTITIES: { List<SshAgent.Pair<PublicKey,String>> keys = agent.getIdentities(); rep.putByte(SSH2_AGENT_IDENTITIES_ANSWER); rep.putInt(keys.size()); @@ -87,11 +87,14 @@ public abstract class AbstractAgentClient extends AbstractLoggingBean { } break; } - case SSH2_AGENTC_SIGN_REQUEST: - { + case SSH2_AGENTC_SIGN_REQUEST: { PublicKey key = req.getPublicKey(); byte[] data = req.getBytes(); int flags = req.getInt(); + if (log.isDebugEnabled()) { + log.debug("SSH2_AGENTC_SIGN_REQUEST key={}, flags=0x{}, data={}", + key.getAlgorithm(), Integer.toHexString(flags), BufferUtils.printHex(':', data)); + } Buffer sig = new ByteArrayBuffer(); sig.putString(KeyUtils.getKeyType(key)); sig.putBytes(agent.sign(key, data)); @@ -99,30 +102,27 @@ public abstract class AbstractAgentClient extends AbstractLoggingBean { rep.putBytes(sig.array(), sig.rpos(), sig.available()); break; } - case SSH2_AGENTC_ADD_IDENTITY: - { + case SSH2_AGENTC_ADD_IDENTITY: { agent.addIdentity(req.getKeyPair(), req.getString()); rep.putByte(SSH_AGENT_SUCCESS); break; } - case SSH2_AGENTC_REMOVE_IDENTITY: - { + case SSH2_AGENTC_REMOVE_IDENTITY: { PublicKey key = req.getPublicKey(); agent.removeIdentity(key); rep.putByte(SSH_AGENT_SUCCESS); break; } case SSH2_AGENTC_REMOVE_ALL_IDENTITIES: - { agent.removeAllIdentities(); rep.putByte(SSH_AGENT_SUCCESS); break; - } default: - { + if (log.isDebugEnabled()) { + log.debug("Unknown command: {}", Integer.valueOf(cmd)); + } + rep.putByte(SSH2_AGENT_FAILURE); - break; - } } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java index 7af4ee2..61df6ae 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java @@ -31,14 +31,46 @@ import java.io.IOException; import java.security.KeyPair; import java.security.PublicKey; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.common.SshException; +import org.apache.sshd.common.util.AbstractLoggingBean; +import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.buffer.BufferUtils; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; -public abstract class AbstractAgentProxy implements SshAgent { +public abstract class AbstractAgentProxy extends AbstractLoggingBean implements SshAgent, ExecutorServiceConfigurer { + private ExecutorService executor; + private boolean shutdownExecutor; + + protected AbstractAgentProxy() { + super(); + } + + @Override + public ExecutorService getExecutorService() { + return executor; + } + + @Override + public void setExecutorService(ExecutorService service) { + executor = service; + } + + @Override + public boolean isShutdownOnExit() { + return shutdownExecutor; + } + + @Override + public void setShutdownOnExit(boolean shutdown) { + shutdownExecutor = shutdown; + } @Override public List<Pair<PublicKey, String>> getIdentities() throws IOException { @@ -46,13 +78,15 @@ public abstract class AbstractAgentProxy implements SshAgent { buffer = request(prepare(buffer)); int type = buffer.getByte(); if (type != SSH2_AGENT_IDENTITIES_ANSWER) { - throw new SshException("SSH agent failure"); + throw new SshException("Bad agent identities answer: " + type); } + int nbIdentities = buffer.getInt(); if (nbIdentities > 1024) { - throw new SshException("SSH agent failure"); + throw new SshException("Bad identities count: " + nbIdentities); } - List<Pair<PublicKey, String>> keys = new ArrayList<Pair<PublicKey, String>>(); + + List<Pair<PublicKey, String>> keys = new ArrayList<Pair<PublicKey, String>>(nbIdentities); for (int i = 0; i < nbIdentities; i++) { PublicKey key = buffer.getPublicKey(); keys.add(new Pair<PublicKey, String>(key, buffer.getString())); @@ -67,12 +101,19 @@ public abstract class AbstractAgentProxy implements SshAgent { buffer.putBytes(data); buffer.putInt(0); buffer = request(prepare(buffer)); - if (buffer.getByte() != SSH2_AGENT_SIGN_RESPONSE) { - throw new SshException("SSH agent failure"); + + byte responseType = buffer.getByte(); + if (responseType != SSH2_AGENT_SIGN_RESPONSE) { + throw new SshException("Bad signing response type: " + (responseType & 0xFF)); } Buffer buf = new ByteArrayBuffer(buffer.getBytes()); - buf.getString(); // algo - return buf.getBytes(); + String algorithm = buf.getString(); + byte[] signature = buf.getBytes(); + if (log.isDebugEnabled()) { + log.debug("sign(" + algorithm + "): " + BufferUtils.printHex(':', signature)); + } + + return signature; } @Override @@ -80,9 +121,15 @@ public abstract class AbstractAgentProxy implements SshAgent { Buffer buffer = createBuffer(SSH2_AGENTC_ADD_IDENTITY); buffer.putKeyPair(key); buffer.putString(comment); + if (log.isDebugEnabled()) { + log.debug("addIdentity(" + comment + "): " + key.getPublic().getAlgorithm()); + } buffer = request(prepare(buffer)); - if (buffer.available() != 1 || buffer.getByte() != SSH_AGENT_SUCCESS) { - throw new SshException("SSH agent failure"); + + int available = buffer.available(); + byte response = (available >= 1) ? buffer.getByte() : -1; + if ((available != 1) || (response != SSH_AGENT_SUCCESS)) { + throw new SshException("Bad addIdentity response (" + (response & 0xFF) + ") - available=" + available); } } @@ -90,24 +137,43 @@ public abstract class AbstractAgentProxy implements SshAgent { public void removeIdentity(PublicKey key) throws IOException { Buffer buffer = createBuffer(SSH2_AGENTC_REMOVE_IDENTITY); buffer.putPublicKey(key); + if (log.isDebugEnabled()) { + log.debug("removeIdentity: " + key.getAlgorithm()); + } + buffer = request(prepare(buffer)); - if (buffer.available() != 1 || buffer.getByte() != SSH_AGENT_SUCCESS) { - throw new SshException("SSH agent failure"); + + int available = buffer.available(); + byte response = (available >= 1) ? buffer.getByte() : -1; + if ((available != 1) || (response != SSH_AGENT_SUCCESS)) { + throw new SshException("Bad removeIdentity response (" + (response & 0xFF) + ") - available=" + available); } } @Override public void removeAllIdentities() throws IOException { Buffer buffer = createBuffer(SSH2_AGENTC_REMOVE_ALL_IDENTITIES); + if (log.isDebugEnabled()) { + log.debug("removeAllIdentities"); + } buffer = request(prepare(buffer)); - if (buffer.available() != 1 || buffer.getByte() != SSH_AGENT_SUCCESS) { - throw new SshException("SSH agent failure"); + + int available = buffer.available(); + byte response = (available >= 1) ? buffer.getByte() : -1; + if ((available != 1) || (response != SSH_AGENT_SUCCESS)) { + throw new SshException("Bad removeAllIdentities response (" + (response & 0xFF) + ") - available=" + available); } } @Override public void close() throws IOException { - // nothing + ExecutorService service = getExecutorService(); + if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) { + Collection<?> runners = service.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("close() - shutdown runners count=" + GenericUtils.size(runners)); + } + } } protected Buffer createBuffer(byte cmd) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java b/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java index 26df439..ed74fb6 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentDelegate.java @@ -34,6 +34,11 @@ public class AgentDelegate implements SshAgent { } @Override + public boolean isOpen() { + return agent.isOpen(); + } + + @Override public void close() throws IOException { // ignored } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java index ea0779a..0318ac1 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.agent.common.AbstractAgentProxy; @@ -42,15 +43,25 @@ public class AgentForwardedChannel extends AbstractClientChannel { public SshAgent getAgent() { return new AbstractAgentProxy() { + private final AtomicBoolean open = new AtomicBoolean(true); + + @Override + public boolean isOpen() { + return open.get(); + } + @Override protected Buffer request(Buffer buffer) throws IOException { return AgentForwardedChannel.this.request(buffer); } + @Override public void close() throws IOException { - AgentForwardedChannel.this.close(false); + if (open.getAndSet(false)) { + AgentForwardedChannel.this.close(false); + super.close(); + } } - }; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java index d4c6850..5ccea64 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentImpl.java @@ -28,6 +28,7 @@ import java.security.interfaces.RSAPublicKey; import java.security.spec.ECParameterSpec; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.common.Signature; @@ -40,13 +41,23 @@ import org.apache.sshd.common.signature.BuiltinSignatures; public class AgentImpl implements SshAgent { private final List<Pair<KeyPair, String>> keys = new ArrayList<Pair<KeyPair, String>>(); - private boolean closed; + private final AtomicBoolean open = new AtomicBoolean(true); + + public AgentImpl() { + super(); + } + + @Override + public boolean isOpen() { + return open.get(); + } @Override public List<Pair<PublicKey, String>> getIdentities() throws IOException { - if (closed) { + if (!isOpen()) { throw new SshException("Agent closed"); } + List<Pair<PublicKey, String>> pks = new ArrayList<Pair<PublicKey, String>>(); for (Pair<KeyPair, String> kp : keys) { pks.add(new Pair<PublicKey, String>(kp.getFirst().getPublic(), kp.getSecond())); @@ -56,15 +67,17 @@ public class AgentImpl implements SshAgent { @Override public byte[] sign(PublicKey key, byte[] data) throws IOException { - if (closed) { + if (!isOpen()) { throw new SshException("Agent closed"); } + Pair<KeyPair, String> kp = getKeyPair(keys, key); if (kp == null) { throw new SshException("Key not found"); } + try { - Signature verif; + final Signature verif; if (kp.getFirst().getPublic() instanceof DSAPublicKey) { verif = BuiltinSignatures.dsa.create(); } else if (kp.getFirst().getPublic() instanceof ECPublicKey) { @@ -87,7 +100,7 @@ public class AgentImpl implements SshAgent { @Override public void addIdentity(KeyPair key, String comment) throws IOException { - if (closed) { + if (!isOpen()) { throw new SshException("Agent closed"); } keys.add(new Pair<KeyPair, String>(key, comment)); @@ -95,7 +108,7 @@ public class AgentImpl implements SshAgent { @Override public void removeIdentity(PublicKey key) throws IOException { - if (closed) { + if (!isOpen()) { throw new SshException("Agent closed"); } Pair<KeyPair, String> kp = getKeyPair(keys, key); @@ -107,7 +120,7 @@ public class AgentImpl implements SshAgent { @Override public void removeAllIdentities() throws IOException { - if (closed) { + if (!isOpen()) { throw new SshException("Agent closed"); } keys.clear(); @@ -115,8 +128,9 @@ public class AgentImpl implements SshAgent { @Override public void close() throws IOException { - closed = true; - keys.clear(); + if (open.getAndSet(false)) { + keys.clear(); + } } protected static SshAgent.Pair<KeyPair, String> getKeyPair(List<SshAgent.Pair<KeyPair, String>> keys, PublicKey key) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java index a8a5271..cd192be 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java @@ -22,11 +22,15 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sshd.agent.common.AbstractAgentProxy; import org.apache.sshd.common.SshException; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.tomcat.jni.Local; import org.apache.tomcat.jni.Pool; import org.apache.tomcat.jni.Socket; @@ -42,11 +46,20 @@ public class AgentClient extends AbstractAgentProxy implements Runnable { private final long handle; private final Buffer receiveBuffer; private final Queue<Buffer> messages; - private boolean closed; + private Future<?> pumper; + private final AtomicBoolean open = new AtomicBoolean(true); public AgentClient(String authSocket) throws IOException { + this(authSocket, null, false); + } + + public AgentClient(String authSocket, ExecutorService executor, boolean shutdownOnExit) throws IOException { + this.authSocket = authSocket; + + setExecutorService((executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentClient[" + authSocket + "]") : executor); + setShutdownOnExit((executor == null) ? true : shutdownOnExit); + try { - this.authSocket = authSocket; pool = Pool.create(AprLibrary.getInstance().getRootPool()); handle = Local.create(authSocket, pool); int result = Local.connect(handle, 0); @@ -55,7 +68,9 @@ public class AgentClient extends AbstractAgentProxy implements Runnable { } receiveBuffer = new ByteArrayBuffer(); messages = new ArrayBlockingQueue<Buffer>(10); - new Thread(this).start(); + + ExecutorService service = getExecutorService(); + pumper = service.submit(this); } catch (IOException e) { throw e; } catch (Exception e) { @@ -64,10 +79,15 @@ public class AgentClient extends AbstractAgentProxy implements Runnable { } @Override + public boolean isOpen() { + return open.get(); + } + + @Override public void run() { try { byte[] buf = new byte[1024]; - while (!closed) { + while (isOpen()) { int result = Socket.recv(handle, buf, 0, buf.length); if (result < Status.APR_SUCCESS) { throwException(result); @@ -75,14 +95,16 @@ public class AgentClient extends AbstractAgentProxy implements Runnable { messageReceived(new ByteArrayBuffer(buf, 0, result)); } } catch (Exception e) { - if (!closed) { - e.printStackTrace(); + if (isOpen()) { + log.warn(e.getClass().getSimpleName() + " while still open: " + e.getMessage()); } } finally { try { close(); } catch(IOException e) { - e.printStackTrace(); + if (log.isDebugEnabled()) { + log.debug(e.getClass().getSimpleName() + " while closing: " + e.getMessage()); + } } } } @@ -111,10 +133,15 @@ public class AgentClient extends AbstractAgentProxy implements Runnable { @Override public void close() throws IOException { - if (!closed) { - closed = true; + if (open.getAndSet(false)) { Socket.close(handle); } + + if ((pumper != null) && isShutdownOnExit() && (!pumper.isDone())) { + pumper.cancel(true); + } + + super.close(); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java index 52146da..f176790 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java @@ -20,13 +20,19 @@ package org.apache.sshd.agent.unix; import java.io.Closeable; import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.agent.common.AbstractAgentClient; import org.apache.sshd.agent.local.AgentImpl; import org.apache.sshd.common.util.AbstractLoggingBean; +import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; +import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.tomcat.jni.Local; import org.apache.tomcat.jni.Pool; import org.apache.tomcat.jni.Socket; @@ -36,26 +42,44 @@ import org.apache.tomcat.jni.Status; /** * A server for an SSH Agent */ -public class AgentServer extends AbstractLoggingBean implements Closeable { +public class AgentServer extends AbstractLoggingBean implements Closeable, ExecutorServiceCarrier { private final SshAgent agent; + private final ExecutorService service; + private final boolean shutdownExecutor; + private Future<?> agentThread; private String authSocket; private long pool; private long handle; - private Thread thread; public AgentServer() { - this(new AgentImpl()); + this(null, false); } - public AgentServer(SshAgent agent) { + public AgentServer(ExecutorService executor, boolean shutdownOnExit) { + this(new AgentImpl(), executor, shutdownOnExit); + } + + public AgentServer(SshAgent agent, ExecutorService executor, boolean shutdownOnExit) { this.agent = agent; + this.service = (executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentServer[" + agent + "]") : executor; + this.shutdownExecutor = (service == executor) ? shutdownOnExit : true; } public SshAgent getAgent() { return agent; } + @Override + public ExecutorService getExecutorService() { + return service; + } + + @Override + public boolean isShutdownOnExit() { + return shutdownExecutor; + } + public String start() throws Exception { authSocket = AprLibrary.createLocalSocketAddress(); pool = Pool.create(AprLibrary.getInstance().getRootPool()); @@ -69,29 +93,56 @@ public class AgentServer extends AbstractLoggingBean implements Closeable { if (result != Status.APR_SUCCESS) { throwException(result); } - thread = new Thread() { - @SuppressWarnings("synthetic-access") - @Override - public void run() { - try { - while (true) { - long clientSock = Local.accept(handle); - Socket.timeoutSet(clientSock, 10000000); - new SshAgentSession(clientSock, agent); + + ExecutorService executor = getExecutorService(); + agentThread = executor.submit(new Runnable() { + @SuppressWarnings("synthetic-access") + @Override + public void run() { + try { + while (true) { + long clientSock = Local.accept(handle); + Socket.timeoutSet(clientSock, 10000000); // TODO make this configurable + new SshAgentSession(clientSock, agent).run(); + } + } catch (Exception e) { + log.error("Failed to run session", e); } - } catch (Exception e) { - log.error("Failed to run session", e); } - } - }; - thread.start(); + }); return authSocket; } @Override public void close() throws IOException { - agent.close(); + IOException err = null; + try { + agent.close(); + } catch(IOException e) { + err = e; + } + Socket.close(handle); + + try { + if ((agentThread != null) && (!agentThread.isDone())) { + agentThread.cancel(true); + } + } finally { + agentThread = null; + } + + ExecutorService executor = getExecutorService(); + if ((executor != null) && isShutdownOnExit() && (!executor.isShutdown())) { + Collection<?> runners = executor.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("Shut down runners count=" + GenericUtils.size(runners)); + } + } + + if (err != null) { + throw err; + } } protected static class SshAgentSession extends AbstractAgentClient implements Runnable { @@ -101,7 +152,6 @@ public class AgentServer extends AbstractLoggingBean implements Closeable { public SshAgentSession(long socket, SshAgent agent) { super(agent); this.socket = socket; - new Thread(this).start(); } @SuppressWarnings("synthetic-access") http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java index aa40241..ab84cee 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java @@ -20,13 +20,21 @@ package org.apache.sshd.agent.unix; import java.io.File; import java.io.IOException; +import java.nio.channels.Channel; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sshd.agent.SshAgentServer; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.SshException; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.util.AbstractLoggingBean; +import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.OsUtils; +import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; +import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.tomcat.jni.Local; import org.apache.tomcat.jni.Pool; import org.apache.tomcat.jni.Socket; @@ -35,19 +43,25 @@ import org.apache.tomcat.jni.Status; /** * The server side fake agent, acting as an agent, but actually forwarding the requests to the auth channel on the client side. */ -public class AgentServerProxy extends AbstractLoggingBean implements SshAgentServer { +public class AgentServerProxy extends AbstractLoggingBean implements SshAgentServer, ExecutorServiceCarrier, Channel { private final ConnectionService service; private final String authSocket; private final long pool; private final long handle; - private final Thread thread; - private volatile boolean closed; - private volatile boolean innerFinished; + private Future<?> piper; + private final ExecutorService pipeService; + private final boolean pipeCloseOnExit; + private final AtomicBoolean open = new AtomicBoolean(true); + private final AtomicBoolean innerFinished = new AtomicBoolean(false); //used to wake the Local.listen() JNI call private static final byte[] END_OF_STREAM_MESSAGE = new byte[] { "END_OF_STREAM".getBytes()[0] }; public AgentServerProxy(ConnectionService service) throws IOException { + this(service, null, false); + } + + public AgentServerProxy(ConnectionService service, ExecutorService executor, boolean shutdownOnExit) throws IOException { this.service = service; try { String authSocket = AprLibrary.createLocalSocketAddress(); @@ -66,40 +80,41 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer if (result != Status.APR_SUCCESS) { throwException(result); } - thread = new Thread("sshd-AgentServerProxy-PIPE-" + authSocket) { - @SuppressWarnings("synthetic-access") - @Override - public void run() { - try { - while (!closed) { - try { - long clientSock = Local.accept(handle); - if (closed) { - break; - } - Socket.timeoutSet(clientSock, 10000000); - AgentForwardedChannel channel = new AgentForwardedChannel(clientSock); - AgentServerProxy.this.service.registerChannel(channel); - OpenFuture future = channel.open().await(); - Throwable t = future.getException(); - if (t instanceof Exception) { - throw (Exception) t; - } else if (t != null) { - throw new Exception(t); - } - } catch (Exception e) { - if (!closed) { - log.info("Exchange caught in authentication forwarding", e); + + pipeService = (executor == null) ? ThreadUtils.newSingleThreadExecutor("sshd-AgentServerProxy-PIPE-" + authSocket) : executor; + pipeCloseOnExit = (executor == pipeService) ? shutdownOnExit : true; + piper = pipeService.submit(new Runnable() { + @SuppressWarnings("synthetic-access") + @Override + public void run() { + try { + while (isOpen()) { + try { + long clientSock = Local.accept(handle); + if (!isOpen()) { + break; + } + Socket.timeoutSet(clientSock, 10000000); // TODO allow to configure this + AgentForwardedChannel channel = new AgentForwardedChannel(clientSock); + AgentServerProxy.this.service.registerChannel(channel); + OpenFuture future = channel.open().await(); + Throwable t = future.getException(); + if (t instanceof Exception) { + throw (Exception) t; + } else if (t != null) { + throw new Exception(t); + } + } catch (Exception e) { + if (isOpen()) { + log.info(e.getClass().getSimpleName() + " while authentication forwarding: " + e.getMessage(), e); + } } } + } finally { + innerFinished.set(true); } - } finally { - innerFinished = true; } - } - }; - thread.setDaemon(true); - thread.start(); + }); } catch (IOException e) { throw e; } catch (Exception e) { @@ -108,25 +123,40 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer } @Override + public boolean isOpen() { + return open.get(); + } + + @Override + public ExecutorService getExecutorService() { + return pipeService; + } + + @Override + public boolean isShutdownOnExit() { + return pipeCloseOnExit; + } + + @Override public String getId() { return authSocket; } @Override public synchronized void close() throws IOException { - if (closed) { - return; + if (!open.getAndSet(false)) { + return; // already closed (or closing) } - closed = true; + final boolean isDebug = log.isDebugEnabled(); if (handle != 0) { - if (!innerFinished) { + if (!innerFinished.get()) { try { final long tmpPool = Pool.create(AprLibrary.getInstance().getRootPool()); final long tmpSocket = Local.create(authSocket, tmpPool); - long connectResult = Local.connect(tmpSocket, 0); + long connectResult = Local.connect(tmpSocket, 0L); if (connectResult != Status.APR_SUCCESS) { if (isDebug) { @@ -157,7 +187,6 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer try { if (authSocket != null) { - final File socketFile = new File(authSocket); if (socketFile.exists()) { if (socketFile.delete()) { @@ -182,6 +211,22 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer log.debug("Exception deleting the PIPE socket: " + authSocket, e); } } + + try { + if ((piper != null) && (!piper.isDone())) { + piper.cancel(true); + } + } finally { + piper = null; + } + + ExecutorService executor = getExecutorService(); + if ((executor != null) && isShutdownOnExit() && (!executor.isShutdown())) { + Collection<?> runners = executor.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("Shut down runners count=" + GenericUtils.size(runners)); + } + } } /** http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java index 1913652..5f7cb15 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java @@ -20,6 +20,9 @@ package org.apache.sshd.agent.unix; import java.io.IOException; import java.io.OutputStream; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.client.future.DefaultOpenFuture; @@ -31,7 +34,10 @@ import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; +import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; +import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.sshd.server.channel.AbstractServerChannel; import org.apache.tomcat.jni.Local; import org.apache.tomcat.jni.Pool; @@ -43,7 +49,7 @@ import org.apache.tomcat.jni.Status; */ public class ChannelAgentForwarding extends AbstractServerChannel { - public static class ChannelAgentForwardingFactory implements NamedFactory<Channel> { + public static class ChannelAgentForwardingFactory implements NamedFactory<Channel>, ExecutorServiceCarrier { public static final ChannelAgentForwardingFactory INSTANCE = new ChannelAgentForwardingFactory(); public ChannelAgentForwardingFactory() { @@ -55,19 +61,35 @@ public class ChannelAgentForwarding extends AbstractServerChannel { return "auth-ag...@openssh.com"; } + @Override // user can override to provide an alternative + public ExecutorService getExecutorService() { + return null; + } + + @Override + public boolean isShutdownOnExit() { + return false; + } + @Override public Channel create() { - return new ChannelAgentForwarding(); + ChannelAgentForwarding channel = new ChannelAgentForwarding(); + channel.setExecutorService(getExecutorService()); + channel.setShutdownOnExit(isShutdownOnExit()); + return channel; } } private String authSocket; private long pool; private long handle; - private Thread thread; private OutputStream out; + private ExecutorService forwardService; + private Future<?> forwarder; + private boolean shutdownForwarder; public ChannelAgentForwarding() { + super(); } @Override @@ -82,25 +104,28 @@ public class ChannelAgentForwarding extends AbstractServerChannel { if (result != Status.APR_SUCCESS) { throwException(result); } - thread = new Thread() { - @SuppressWarnings("synthetic-access") - @Override - public void run() { - try { - byte[] buf = new byte[1024]; - while (true) { - int len = Socket.recv(handle, buf, 0, buf.length); - if (len > 0) { - out.write(buf, 0, len); - out.flush(); + + ExecutorService service = getExecutorService(); + forwardService = (service == null) ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]") : service; + shutdownForwarder = (service == forwardService) ? isShutdownOnExit() : true; + forwarder = forwardService.submit(new Runnable() { + @SuppressWarnings("synthetic-access") + @Override + public void run() { + try { + byte[] buf = new byte[1024]; + while (true) { + int len = Socket.recv(handle, buf, 0, buf.length); + if (len > 0) { + out.write(buf, 0, len); + out.flush(); + } } + } catch (IOException e) { + close(true); } - } catch (IOException e) { - close(true); } - } - }; - thread.start(); + }); f.setOpened(); } catch (Exception e) { @@ -118,8 +143,27 @@ public class ChannelAgentForwarding extends AbstractServerChannel { super.close(true); // We also need to close the socket. - // Socket.close(handle); + + try { + if ((forwarder != null) && (!forwarder.isDone())) { + forwarder.cancel(true); + } + } finally { + forwarder = null; + } + + try { + if ((forwardService != null) && shutdownForwarder) { + Collection<?> runners = forwardService.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("Shut down runners count=" + GenericUtils.size(runners)); + } + } + } finally { + forwardService = null; + shutdownForwarder = false; + } } @Override @@ -157,9 +201,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel { * @param code APR error code * @throws java.io.IOException the produced exception for the given APR error number */ - private void throwException(int code) throws IOException { - throw new IOException( - org.apache.tomcat.jni.Error.strerror(-code) + - " (code: " + code + ")"); + private static void throwException(int code) throws IOException { + throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")"); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java index 17e8160..61d5a28 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java @@ -19,6 +19,7 @@ package org.apache.sshd.agent.unix; import java.io.IOException; +import java.util.concurrent.ExecutorService; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.agent.SshAgentFactory; @@ -31,13 +32,57 @@ import org.apache.sshd.common.Session; import org.apache.sshd.common.SshException; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.util.GenericUtils; +import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; import org.apache.sshd.server.session.ServerSession; -public class UnixAgentFactory implements SshAgentFactory { +public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigurer { + private ExecutorService executor; + private boolean shutdownExecutor; + private final NamedFactory<Channel> factory = new ChannelAgentForwarding.ChannelAgentForwardingFactory() { + @Override + public ExecutorService getExecutorService() { + return UnixAgentFactory.this.getExecutorService(); + } + + @Override + public boolean isShutdownOnExit() { + return UnixAgentFactory.this.isShutdownOnExit(); + } + + }; + + public UnixAgentFactory() { + super(); + } + + public UnixAgentFactory(ExecutorService service, boolean shutdown) { + executor = service; + shutdownExecutor = shutdown; + } + + @Override + public ExecutorService getExecutorService() { + return executor; + } + + @Override + public void setExecutorService(ExecutorService service) { + executor = service; + } + + @Override + public boolean isShutdownOnExit() { + return shutdownExecutor; + } + + @Override + public void setShutdownOnExit(boolean shutdown) { + shutdownExecutor = shutdown; + } @Override public NamedFactory<Channel> getChannelForwardingFactory() { - return ChannelAgentForwarding.ChannelAgentForwardingFactory.INSTANCE; + return factory; } @Override @@ -46,7 +91,8 @@ public class UnixAgentFactory implements SshAgentFactory { if (GenericUtils.isEmpty(authSocket)) { throw new SshException("No " + SshAgent.SSH_AUTHSOCKET_ENV_NAME + " value"); } - return new AgentClient(authSocket); + + return new AgentClient(authSocket, getExecutorService(), isShutdownOnExit()); } @Override @@ -55,6 +101,6 @@ public class UnixAgentFactory implements SshAgentFactory { if (!(session instanceof ServerSession)) { throw new IllegalStateException("The session used to create an agent server proxy must be a server session"); } - return new AgentServerProxy(service); + return new AgentServerProxy(service, getExecutorService(), isShutdownOnExit()); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index 1f434b0..e690667 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -212,8 +212,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C } try { if (log.isTraceEnabled()) { - log.trace("Waiting for lock on channel {}, mask={}, cond={}", - new Object[] { this, Integer.valueOf(mask), Integer.valueOf(cond) }); + log.trace("Waiting for lock on channel {}, mask={}, cond={}", this, Integer.valueOf(mask), Integer.valueOf(cond)); } if (timeout > 0) { lock.wait(timeout); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java index 91d6153..102e85f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java @@ -20,14 +20,17 @@ package org.apache.sshd.client.channel; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.channel.ChannelAsyncInputStream; +import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.ChannelPipedInputStream; import org.apache.sshd.common.channel.ChannelPipedOutputStream; -import org.apache.sshd.common.channel.ChannelAsyncInputStream; -import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.future.CloseFuture; +import org.apache.sshd.common.util.threads.ThreadUtils; /** * TODO Add javadoc @@ -35,8 +38,9 @@ import org.apache.sshd.common.future.CloseFuture; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public class ChannelSession extends AbstractClientChannel { - - private Thread streamPumper; + private ExecutorService pumperService; + private Future<?> pumper; + private boolean shutdownPumper; public ChannelSession() { super("session"); @@ -73,28 +77,50 @@ public class ChannelSession extends AbstractClientChannel { err = pos; invertedErr = pis; } + if (in != null) { - streamPumper = new Thread("ClientInputStreamPump") { - @Override - public void run() { - pumpInputStream(); - } - }; + // allocate a temporary executor service if none provided + ExecutorService service = getExecutorService(); + if ((pumperService = service) == null) { + pumperService = ThreadUtils.newSingleThreadExecutor("ClientInputStreamPump[" + this.toString() + "]"); + } + + // shutdown the temporary executor service if had to create it + shutdownPumper = (pumperService == service) ? isShutdownOnExit() : true; + // Interrupt does not really work and the thread will only exit when // the call to read() will return. So ensure this thread is a daemon // to avoid blocking the whole app - streamPumper.setDaemon(true); - streamPumper.start(); + pumper = pumperService.submit(new Runnable() { + @Override + public void run() { + pumpInputStream(); + } + }); } } } @Override protected void doCloseImmediately() { - if (streamPumper != null) { - streamPumper.interrupt(); - streamPumper = null; + if ((pumper != null) && (pumperService != null) && shutdownPumper && (!pumperService.isShutdown())) { + try { + if (!pumper.isDone()) { + pumper.cancel(true); + } + + pumperService.shutdownNow(); + } catch(Exception e) { + // we log it as DEBUG since it is relatively harmless + if (log.isDebugEnabled()) { + log.debug("Failed (" + e.getClass().getSimpleName() + ") to shutdown stream pumper: " + e.getMessage()); + } + } finally { + pumper = null; + pumperService = null; + } } + super.doCloseImmediately(); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java index 9250847..73b132a 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSubsystem.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; +import org.apache.sshd.common.util.GenericUtils; +import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; /** @@ -35,10 +37,7 @@ public class ChannelSubsystem extends ChannelSession { private final String subsystem; public ChannelSubsystem(String subsystem) { - if (subsystem == null) { - throw new IllegalArgumentException("subsystem must not be null"); - } - this.subsystem = subsystem; + this.subsystem = ValidateUtils.checkNotNullAndNotEmpty(subsystem, "Subsystem may not be null/empty", GenericUtils.EMPTY_OBJECT_ARRAY); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index 6387cac..6abe007 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -20,7 +20,9 @@ package org.apache.sshd.common.channel; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -37,16 +39,20 @@ import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.util.CloseableUtils; +import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.BufferUtils; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer; /** * TODO Add javadoc * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ -public abstract class AbstractChannel extends CloseableUtils.AbstractInnerCloseable implements Channel { +public abstract class AbstractChannel + extends CloseableUtils.AbstractInnerCloseable + implements Channel, ExecutorServiceConfigurer { public static final int DEFAULT_WINDOW_SIZE = 0x200000; public static final int DEFAULT_PACKET_SIZE = 0x8000; @@ -57,6 +63,8 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea Opened, CloseSent, CloseReceived, Closed } + private ExecutorService executor; + private boolean shutdownExecutor; protected final Window localWindow = new Window(this, null, getClass().getName().contains(".client."), true); protected final Window remoteWindow = new Window(this, null, getClass().getName().contains(".client."), false); protected ConnectionService service; @@ -106,6 +114,26 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea } @Override + public ExecutorService getExecutorService() { + return executor; + } + + @Override + public void setExecutorService(ExecutorService service) { + executor = service; + } + + @Override + public boolean isShutdownOnExit() { + return shutdownExecutor; + } + + @Override + public void setShutdownOnExit(boolean shutdown) { + shutdownExecutor = shutdown; + } + + @Override public void handleRequest(Buffer buffer) throws IOException { String req = buffer.getString(); boolean wantReply = buffer.getBoolean(); @@ -189,10 +217,12 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea public boolean isClosing() { return closing; } + @Override public boolean isClosed() { return gracefulFuture.isClosed(); } + @Override public CloseFuture close(boolean immediately) { closing = true; @@ -226,6 +256,15 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea AbstractChannel.this.close(true); } } + + ExecutorService service = getExecutorService(); + if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) { + Collection<?> running = service.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("Shutdown executor service on close - running count=" + GenericUtils.size(running)); + } + } + return gracefulFuture; } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java index 21e60df..255bfe3 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java @@ -21,6 +21,8 @@ package org.apache.sshd.common.forward; import java.io.IOException; import java.io.OutputStream; import java.net.ConnectException; +import java.util.Collection; +import java.util.concurrent.ExecutorService; import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; @@ -40,6 +42,8 @@ import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.util.Readable; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; +import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.sshd.server.channel.AbstractServerChannel; import org.apache.sshd.server.channel.OpenChannelException; @@ -49,7 +53,7 @@ import org.apache.sshd.server.channel.OpenChannelException; * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ public class TcpipServerChannel extends AbstractServerChannel { - public abstract static class TcpipFactory implements NamedFactory<Channel> { + public abstract static class TcpipFactory implements NamedFactory<Channel>, ExecutorServiceCarrier { private final Type type; protected TcpipFactory(Type type) { @@ -60,9 +64,22 @@ public class TcpipServerChannel extends AbstractServerChannel { return type; } + @Override // user can override to provide an alternative + public ExecutorService getExecutorService() { + return null; + } + + @Override + public boolean isShutdownOnExit() { + return false; + } + @Override public Channel create() { - return new TcpipServerChannel(getType()); + TcpipServerChannel channel = new TcpipServerChannel(getType()); + channel.setExecutorService(getExecutorService()); + channel.setShutdownOnExit(isShutdownOnExit()); + return channel; } } @@ -132,7 +149,7 @@ public class TcpipServerChannel extends AbstractServerChannel { } final ForwardingFilter filter = getSession().getFactoryManager().getTcpipForwardingFilter(); - if (address == null || filter == null || !filter.canConnect(address, getSession())) { + if ((address == null) || (filter == null) || (!filter.canConnect(address, getSession()))) { super.close(true); f.setException(new OpenChannelException(SshConstants.SSH_OPEN_ADMINISTRATIVELY_PROHIBITED, "Connection denied")); return f; @@ -205,16 +222,33 @@ public class TcpipServerChannel extends AbstractServerChannel { // We also need to dispose of the connector, but unfortunately we // are being invoked by the connector thread or the connector's // own processor thread. Disposing of the connector within either - // causes deadlock. Instead create a new thread to dispose of the + // causes deadlock. Instead create a thread to dispose of the // connector in the background. - // - new Thread("TcpIpServerChannel-ConnectorCleanup") { - @SuppressWarnings("synthetic-access") - @Override - public void run() { - connector.close(true); - } - }.start(); + + ExecutorService service = getExecutorService(); + // allocate a temporary executor service if none provided + final ExecutorService executors = (service == null) + ? ThreadUtils.newSingleThreadExecutor("TcpIpServerChannel-ConnectorCleanup[" + getSession() + "]") + : service + ; + // shutdown the temporary executor service if had to create it + final boolean shutdown = (executors == service) ? isShutdownOnExit() : true; + executors.submit(new Runnable() { + @SuppressWarnings("synthetic-access") + @Override + public void run() { + try { + connector.close(true); + } finally { + if ((executors != null) && (!executors.isShutdown()) && shutdown) { + Collection<Runnable> runners = executors.shutdownNow(); + if (log.isDebugEnabled()) { + log.debug("destroy() - shutdown executor service - runners count=" + ((runners == null) ? 0 : runners.size())); + } + } + } + } + }); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java index cffa5c2..5b79870 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java @@ -90,22 +90,23 @@ public class ThreadUtils { /** * Attempts to find the most suitable {@link ClassLoader} as follows:</BR> * <UL> - * <LI> - * Check the {@link Thread#getContextClassLoader()} value - * </LI> - * <p/> - * <LI> - * If no thread context class loader then check the anchor - * class (if given) for its class loader - * </LI> - * <p/> - * <LI> - * If still no loader available, then use {@link ClassLoader#getSystemClassLoader()} - * </LI> + * <LI> + * Check the {@link Thread#getContextClassLoader()} value + * </LI> + * + * <LI> + * If no thread context class loader then check the anchor + * class (if given) for its class loader + * </LI> + * + * <LI> + * If still no loader available, then use {@link ClassLoader#getSystemClassLoader()} + * </LI> * </UL> - * - * @param anchor - * @return + * @param anchor The anchor {@link Class} to use if no current thread + * - ignored if {@code null} + * context class loader + * @return The resolver {@link ClassLoader} */ public static ClassLoader resolveDefaultClassLoader(Class<?> anchor) { Thread thread = Thread.currentThread(); @@ -125,38 +126,27 @@ public class ThreadUtils { return cl; } - public static ExecutorService newFixedThreadPool( - String poolName, - int nThreads - ) { + public static ExecutorService newFixedThreadPool(String poolName, int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), - new SshdThreadFactory(poolName), - new ThreadPoolExecutor.CallerRunsPolicy()); + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + new SshdThreadFactory(poolName), + new ThreadPoolExecutor.CallerRunsPolicy()); } - public static ExecutorService newCachedThreadPool( - String poolName - ) { + public static ExecutorService newCachedThreadPool(String poolName) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), - new SshdThreadFactory(poolName), - new ThreadPoolExecutor.CallerRunsPolicy()); + 60L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + new SshdThreadFactory(poolName), + new ThreadPoolExecutor.CallerRunsPolicy()); } - public static ScheduledExecutorService newSingleThreadScheduledExecutor( - String poolName - ) { - return new ScheduledThreadPoolExecutor( - 1, - new SshdThreadFactory(poolName)); + public static ScheduledExecutorService newSingleThreadScheduledExecutor(String poolName) { + return new ScheduledThreadPoolExecutor(1, new SshdThreadFactory(poolName)); } - public static ExecutorService newSingleThreadExecutor( - String poolName - ) { + public static ExecutorService newSingleThreadExecutor(String poolName) { return newFixedThreadPool(poolName, 1); } @@ -168,18 +158,21 @@ public class ThreadUtils { public SshdThreadFactory(String name) { SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - namePrefix = "sshd-" + name + "-thread-"; + ThreadGroup parentGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + String effectiveName = name.replace(' ', '-'); + group = new ThreadGroup(parentGroup, "sshd-" + effectiveName + "-group"); + namePrefix = "sshd-" + effectiveName + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); - if (t.isDaemon()) - t.setDaemon(false); - if (t.getPriority() != Thread.NORM_PRIORITY) + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); + } return t; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java index 2cd4638..baf6ca2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommand.java @@ -194,7 +194,7 @@ public class ScpCommand extends AbstractLoggingBean implements Command, Runnable pendingFuture = null; - if ((executors != null) && shutdownExecutor) { + if ((executors != null) && (!executors.isShutdown()) && shutdownExecutor) { Collection<Runnable> runners = executors.shutdownNow(); if (log.isDebugEnabled()) { log.debug("destroy() - shutdown executor service - runners count=" + ((runners == null) ? 0 : runners.size())); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java index b57705d..ecc8d29 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/sftp/SftpSubsystem.java @@ -2139,7 +2139,7 @@ public class SftpSubsystem extends AbstractLoggingBean implements Command, Runna pendingFuture = null; - if ((executors != null) && shutdownExecutor) { + if ((executors != null) && (!executors.isShutdown()) && shutdownExecutor) { Collection<Runnable> runners = executors.shutdownNow(); if (log.isDebugEnabled()) { log.debug("destroy() - shutdown executor service - runners count=" + ((runners == null) ? 0 : runners.size())); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-core/src/test/java/org/apache/sshd/AgentTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/AgentTest.java b/sshd-core/src/test/java/org/apache/sshd/AgentTest.java index d21828e..d7e4fbc 100644 --- a/sshd-core/src/test/java/org/apache/sshd/AgentTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/AgentTest.java @@ -19,8 +19,6 @@ package org.apache.sshd; import static org.apache.sshd.util.Utils.createTestKeyPairProvider; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.junit.Assume.assumeThat; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -49,11 +47,14 @@ import org.junit.Assume; import org.junit.Test; public class AgentTest extends BaseTestSupport { + public AgentTest() { + super(); + } @Test - public void testAgent() throws Exception { + public void testAgentServer() throws Exception { // TODO: revisit this test to work without BC - Assume.assumeTrue("BoncyCastle not registered", SecurityUtils.isBouncyCastleRegistered()); + Assume.assumeTrue("BouncyCastle not registered", SecurityUtils.isBouncyCastleRegistered()); try(AgentServer agent = new AgentServer()) { String authSocket; @@ -63,23 +64,23 @@ public class AgentTest extends BaseTestSupport { // the native library is not available, so these tests should be skipped authSocket = null; } - assumeThat(authSocket, notNullValue()); + Assume.assumeTrue("Native library N/A", authSocket != null); try(SshAgent client = new AgentClient(authSocket)) { List<SshAgent.Pair<PublicKey, String>> keys = client.getIdentities(); - assertNotNull(keys); - assertEquals(0, keys.size()); + assertNotNull("No initial identities", keys); + assertEquals("Unexpected initial identities size", 0, keys.size()); KeyPair k = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA); client.addIdentity(k, ""); keys = client.getIdentities(); - assertNotNull(keys); - assertEquals(1, keys.size()); + assertNotNull("No registered identities after add", keys); + assertEquals("Mismatched registered keys size", 1, keys.size()); client.removeIdentity(k.getPublic()); keys = client.getIdentities(); - assertNotNull(keys); - assertEquals(0, keys.size()); + assertNotNull("No registered identities after remove", keys); + assertEquals("Registered keys size not empty", 0, keys.size()); client.removeAllIdentities(); } @@ -89,14 +90,14 @@ public class AgentTest extends BaseTestSupport { @Test public void testAgentForwarding() throws Exception { // TODO: revisit this test to work without BC - Assume.assumeTrue("BoncyCastle not registered", SecurityUtils.isBouncyCastleRegistered()); + Assume.assumeTrue("BouncyCastle not registered", SecurityUtils.isBouncyCastleRegistered()); TestEchoShellFactory shellFactory = new TestEchoShellFactory(); ProxyAgentFactory agentFactory = new ProxyAgentFactory(); LocalAgentFactory localAgentFactory = new LocalAgentFactory(); - + String username = getCurrentTestName(); KeyPair pair = createTestKeyPairProvider("dsaprivkey.pem").loadKey(KeyPairProvider.SSH_DSS); - localAgentFactory.getAgent().addIdentity(pair, "smx"); + localAgentFactory.getAgent().addIdentity(pair, username); try(SshServer sshd1 = SshServer.setUpDefaultServer()) { sshd1.setKeyPairProvider(Utils.createTestHostKeyProvider()); @@ -120,8 +121,8 @@ public class AgentTest extends BaseTestSupport { client1.setAgentFactory(localAgentFactory); client1.start(); - try(ClientSession session1 = client1.connect("smx", "localhost", port1).await().getSession()) { - session1.auth().verify(5L, TimeUnit.SECONDS); + try(ClientSession session1 = client1.connect(username, "localhost", port1).await().getSession()) { + session1.auth().verify(10L, TimeUnit.SECONDS); try(ChannelShell channel1 = session1.createShellChannel(); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -145,7 +146,7 @@ public class AgentTest extends BaseTestSupport { client2.getProperties().putAll(shellFactory.shell.getEnvironment().getEnv()); client2.start(); - try(ClientSession session2 = client2.connect("smx", "localhost", port2).await().getSession()) { + try(ClientSession session2 = client2.connect(username, "localhost", port2).await().getSession()) { session2.auth().verify(5L, TimeUnit.SECONDS); try(ChannelShell channel2 = session2.createShellChannel()) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java ---------------------------------------------------------------------- diff --git a/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java b/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java index d8b4c80..6f09d34 100644 --- a/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java +++ b/sshd-git/src/main/java/org/apache/sshd/git/pack/GitPackCommand.java @@ -54,10 +54,12 @@ public class GitPackCommand implements Command, Runnable { this.command = command; } + @Override public void setInputStream(InputStream in) { this.in = in; } + @Override public void setOutputStream(OutputStream out) { this.out = out; if (out instanceof ChannelOutputStream) { @@ -65,6 +67,7 @@ public class GitPackCommand implements Command, Runnable { } } + @Override public void setErrorStream(OutputStream err) { this.err = err; if (err instanceof ChannelOutputStream) { @@ -72,14 +75,19 @@ public class GitPackCommand implements Command, Runnable { } } + @Override public void setExitCallback(ExitCallback callback) { this.callback = callback; } + @Override public void start(Environment env) throws IOException { - new Thread(this).start(); + Thread thread=new Thread(this); + thread.setDaemon(true); + thread.start(); } + @Override public void run() { try { List<String> strs = parseDelimitedString(command, " ", true); @@ -114,6 +122,7 @@ public class GitPackCommand implements Command, Runnable { } } + @Override public void destroy() { //To change body of implemented methods use File | Settings | File Templates. } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/009d832d/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java ---------------------------------------------------------------------- diff --git a/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java b/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java index 22a2f11..210a7cf 100644 --- a/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java +++ b/sshd-git/src/main/java/org/apache/sshd/git/pgm/GitPgmCommand.java @@ -48,10 +48,12 @@ public class GitPgmCommand implements Command, Runnable { this.command = command; } + @Override public void setInputStream(InputStream in) { this.in = in; } + @Override public void setOutputStream(OutputStream out) { this.out = out; if (out instanceof ChannelOutputStream) { @@ -59,6 +61,7 @@ public class GitPgmCommand implements Command, Runnable { } } + @Override public void setErrorStream(OutputStream err) { this.err = err; if (err instanceof ChannelOutputStream) { @@ -66,14 +69,19 @@ public class GitPgmCommand implements Command, Runnable { } } + @Override public void setExitCallback(ExitCallback callback) { this.callback = callback; } + @Override public void start(Environment env) throws IOException { - new Thread(this).start(); + Thread thread=new Thread(this); + thread.setDaemon(true); + thread.start(); } + @Override public void run() { try { List<String> strs = parseDelimitedString(command, " ", true); @@ -104,6 +112,7 @@ public class GitPgmCommand implements Command, Runnable { } } + @Override public void destroy() { //To change body of implemented methods use File | Settings | File Templates. }