CAMEL-6947: rabbitmq producer should start|stop more cleanly, such as making sure to call close and shutdown the thread pool.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8228cfee Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8228cfee Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8228cfee Branch: refs/heads/camel-2.12.x Commit: 8228cfee9e8711131eeec57373c70611ba27edb7 Parents: 6c13354 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Nov 8 15:41:44 2013 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Nov 8 15:42:02 2013 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQProducer.java | 49 +++++++++++++++++--- 1 file changed, 42 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8228cfee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 3bebb3f..5bf4269 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -21,7 +21,7 @@ import java.math.BigDecimal; import java.util.Date; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; @@ -32,13 +32,13 @@ import org.apache.camel.util.ObjectHelper; public class RabbitMQProducer extends DefaultProducer { - private final Connection conn; - private final Channel channel; + private int closeTimeout = 30 * 1000; + private Connection conn; + private Channel channel; + private ExecutorService executorService; public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException { super(endpoint); - this.conn = endpoint.connect(Executors.newSingleThreadExecutor()); - this.channel = conn.createChannel(); } @Override @@ -46,8 +46,35 @@ public class RabbitMQProducer extends DefaultProducer { return (RabbitMQEndpoint) super.getEndpoint(); } - public void shutdown() throws IOException { - conn.close(); + @Override + protected void doStart() throws Exception { + this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]"); + + log.trace("Creating connection..."); + this.conn = getEndpoint().connect(executorService); + log.debug("Created connection: {}", conn); + + log.trace("Creating channel..."); + this.channel = conn.createChannel(); + log.debug("Created channel: {}", channel); + } + + @Override + protected void doStop() throws Exception { + if (channel != null) { + log.debug("Closing channel: {}", channel); + channel.close(); + channel = null; + } + if (conn != null) { + log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout); + conn.close(closeTimeout); + conn = null; + } + if (executorService != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService); + executorService = null; + } } @Override @@ -179,4 +206,12 @@ public class RabbitMQProducer extends DefaultProducer { } return null; } + + public int getCloseTimeout() { + return closeTimeout; + } + + public void setCloseTimeout(int closeTimeout) { + this.closeTimeout = closeTimeout; + } }