Repository: camel Updated Branches: refs/heads/camel-2.16.x d5914d468 -> cf4f61a02
CAMEL-9319 SshClient resource leak when used from ProducerTemplate Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cf4f61a0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cf4f61a0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cf4f61a0 Branch: refs/heads/camel-2.16.x Commit: cf4f61a02e09668adbcc3a9f943d80af7a78cf5a Parents: d5914d4 Author: Andrea Cosentino <anco...@gmail.com> Authored: Mon Nov 16 17:09:04 2015 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Mon Nov 16 17:29:14 2015 +0100 ---------------------------------------------------------------------- .../apache/camel/component/ssh/SshConsumer.java | 23 +++- .../apache/camel/component/ssh/SshEndpoint.java | 105 +-------------- .../apache/camel/component/ssh/SshHelper.java | 127 +++++++++++++++++++ .../apache/camel/component/ssh/SshProducer.java | 23 +++- 4 files changed, 172 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cf4f61a0/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java index 330972e..682268a 100644 --- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java +++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java @@ -19,19 +19,40 @@ package org.apache.camel.component.ssh; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledPollConsumer; +import org.apache.sshd.SshClient; public class SshConsumer extends ScheduledPollConsumer { private final SshEndpoint endpoint; + + private SshClient client; public SshConsumer(SshEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + client = SshClient.setUpDefaultClient(); + client.start(); + } + + @Override + protected void doStop() throws Exception { + if (client != null) { + client.stop(); + client = null; + } + + super.doStop(); + } @Override protected int poll() throws Exception { String command = endpoint.getPollCommand(); - SshResult result = endpoint.sendExecCommand(command); + SshResult result = SshHelper.sendExecCommand(command, endpoint, client); Exchange exchange = endpoint.createExchange(); exchange.getIn().setBody(result.getStdout()); http://git-wip-us.apache.org/repos/asf/camel/blob/cf4f61a0/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java index 8235e1b..af709f4 100644 --- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java +++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java @@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory; public class SshEndpoint extends ScheduledPollEndpoint { protected final Logger log = LoggerFactory.getLogger(getClass()); - private SshClient client; @UriParam private SshConfiguration sshConfiguration; @@ -75,109 +74,7 @@ public class SshEndpoint extends ScheduledPollEndpoint { @Override public boolean isSingleton() { // SshClient is not thread-safe to be shared - return false; - } - - public SshResult sendExecCommand(String command) throws Exception { - SshResult result = null; - - if (getConfiguration() == null) { - throw new IllegalStateException("Configuration must be set"); - } - - ConnectFuture connectFuture = client.connect(null, getHost(), getPort()); - - // Wait getTimeout milliseconds for connect operation to complete - connectFuture.await(getTimeout()); - - if (!connectFuture.isDone() || !connectFuture.isConnected()) { - final String msg = "Failed to connect to " + getHost() + ":" + getPort() + " within timeout " + getTimeout() + "ms"; - log.debug(msg); - throw new RuntimeCamelException(msg); - } - - log.debug("Connected to {}:{}", getHost(), getPort()); - - ClientChannel channel = null; - ClientSession session = null; - - try { - AuthFuture authResult; - session = connectFuture.getSession(); - - KeyPairProvider keyPairProvider; - final String certResource = getCertResource(); - if (certResource != null) { - log.debug("Attempting to authenticate using ResourceKey '{}'...", certResource); - keyPairProvider = new ResourceHelperKeyPairProvider(new String[]{certResource}, getCamelContext().getClassResolver()); - } else { - keyPairProvider = getKeyPairProvider(); - } - - if (keyPairProvider != null) { - log.debug("Attempting to authenticate username '{}' using Key...", getUsername()); - KeyPair pair = keyPairProvider.loadKey(getKeyType()); - authResult = session.authPublicKey(getUsername(), pair); - } else { - log.debug("Attempting to authenticate username '{}' using Password...", getUsername()); - authResult = session.authPassword(getUsername(), getPassword()); - } - - authResult.await(getTimeout()); - - if (!authResult.isDone() || authResult.isFailure()) { - log.debug("Failed to authenticate"); - throw new RuntimeCamelException("Failed to authenticate username " + getUsername()); - } - - channel = session.createChannel(ClientChannel.CHANNEL_EXEC, command); - - ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0}); - channel.setIn(in); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - channel.setOut(out); - - ByteArrayOutputStream err = new ByteArrayOutputStream(); - channel.setErr(err); - OpenFuture openFuture = channel.open(); - openFuture.await(getTimeout()); - if (openFuture.isOpened()) { - channel.waitFor(ClientChannel.CLOSED, 0); - result = new SshResult(command, channel.getExitStatus(), - new ByteArrayInputStream(out.toByteArray()), - new ByteArrayInputStream(err.toByteArray())); - - } - return result; - } finally { - if (channel != null) { - channel.close(true); - } - // need to make sure the session is closed - if (session != null) { - session.close(false); - } - } - - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - - client = SshClient.setUpDefaultClient(); - client.start(); - } - - @Override - protected void doStop() throws Exception { - if (client != null) { - client.stop(); - client = null; - } - - super.doStop(); + return true; } public SshConfiguration getConfiguration() { http://git-wip-us.apache.org/repos/asf/camel/blob/cf4f61a0/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java new file mode 100644 index 0000000..6c9ea02 --- /dev/null +++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ssh; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.security.KeyPair; + +import org.apache.camel.RuntimeCamelException; +import org.apache.sshd.ClientChannel; +import org.apache.sshd.ClientSession; +import org.apache.sshd.SshClient; +import org.apache.sshd.client.future.AuthFuture; +import org.apache.sshd.client.future.ConnectFuture; +import org.apache.sshd.client.future.OpenFuture; +import org.apache.sshd.common.KeyPairProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class SshHelper { + + protected static final Logger LOG = LoggerFactory.getLogger(SshHelper.class); + + private SshHelper() { + } + + public static SshResult sendExecCommand(String command, SshEndpoint endpoint, SshClient client) throws Exception { + SshResult result = null; + + SshConfiguration configuration = endpoint.getConfiguration(); + + if (configuration == null) { + throw new IllegalStateException("Configuration must be set"); + } + + ConnectFuture connectFuture = client.connect(null, configuration.getHost(), configuration.getPort()); + + // Wait getTimeout milliseconds for connect operation to complete + connectFuture.await(configuration.getTimeout()); + + if (!connectFuture.isDone() || !connectFuture.isConnected()) { + final String msg = "Failed to connect to " + configuration.getHost() + ":" + configuration.getPort() + " within timeout " + configuration.getTimeout() + "ms"; + LOG.debug(msg); + throw new RuntimeCamelException(msg); + } + + LOG.debug("Connected to {}:{}", configuration.getHost(), configuration.getPort()); + + ClientChannel channel = null; + ClientSession session = null; + + try { + AuthFuture authResult; + session = connectFuture.getSession(); + + KeyPairProvider keyPairProvider; + final String certResource = configuration.getCertResource(); + if (certResource != null) { + LOG.debug("Attempting to authenticate using ResourceKey '{}'...", certResource); + keyPairProvider = new ResourceHelperKeyPairProvider(new String[]{certResource}, endpoint.getCamelContext().getClassResolver()); + } else { + keyPairProvider = configuration.getKeyPairProvider(); + } + + if (keyPairProvider != null) { + LOG.debug("Attempting to authenticate username '{}' using Key...", configuration.getUsername()); + KeyPair pair = keyPairProvider.loadKey(configuration.getKeyType()); + authResult = session.authPublicKey(configuration.getUsername(), pair); + } else { + LOG.debug("Attempting to authenticate username '{}' using Password...", configuration.getUsername()); + authResult = session.authPassword(configuration.getUsername(), configuration.getPassword()); + } + + authResult.await(configuration.getTimeout()); + + if (!authResult.isDone() || authResult.isFailure()) { + LOG.debug("Failed to authenticate"); + throw new RuntimeCamelException("Failed to authenticate username " + configuration.getUsername()); + } + + channel = session.createChannel(ClientChannel.CHANNEL_EXEC, command); + + ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0}); + channel.setIn(in); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + channel.setOut(out); + + ByteArrayOutputStream err = new ByteArrayOutputStream(); + channel.setErr(err); + OpenFuture openFuture = channel.open(); + openFuture.await(configuration.getTimeout()); + if (openFuture.isOpened()) { + channel.waitFor(ClientChannel.CLOSED, 0); + result = new SshResult(command, channel.getExitStatus(), + new ByteArrayInputStream(out.toByteArray()), + new ByteArrayInputStream(err.toByteArray())); + + } + return result; + } finally { + if (channel != null) { + channel.close(true); + } + // need to make sure the session is closed + if (session != null) { + session.close(false); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/cf4f61a0/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java index f29c854..a2867cb 100644 --- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java +++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java @@ -20,14 +20,35 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; +import org.apache.sshd.SshClient; public class SshProducer extends DefaultProducer { private SshEndpoint endpoint; + + private SshClient client; public SshProducer(SshEndpoint endpoint) { super(endpoint); this.endpoint = endpoint; } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + client = SshClient.setUpDefaultClient(); + client.start(); + } + + @Override + protected void doStop() throws Exception { + if (client != null) { + client.stop(); + client = null; + } + + super.doStop(); + } @Override public void process(Exchange exchange) throws Exception { @@ -35,7 +56,7 @@ public class SshProducer extends DefaultProducer { String command = in.getMandatoryBody(String.class); try { - SshResult result = endpoint.sendExecCommand(command); + SshResult result = SshHelper.sendExecCommand(command, endpoint, client); exchange.getOut().setBody(result.getStdout()); exchange.getOut().setHeader(SshResult.EXIT_VALUE, result.getExitValue()); exchange.getOut().setHeader(SshResult.STDERR, result.getStderr());