CAMEL-6947: rabbitmq producer should start|stop more cleanly, such as making 
sure to call close and shutdown the thread pool.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8228cfee
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8228cfee
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8228cfee

Branch: refs/heads/camel-2.12.x
Commit: 8228cfee9e8711131eeec57373c70611ba27edb7
Parents: 6c13354
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Nov 8 15:41:44 2013 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Nov 8 15:42:02 2013 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQProducer.java    | 49 +++++++++++++++++---
 1 file changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8228cfee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 3bebb3f..5bf4269 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -21,7 +21,7 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
@@ -32,13 +32,13 @@ import org.apache.camel.util.ObjectHelper;
 
 public class RabbitMQProducer extends DefaultProducer {
 
-    private final Connection conn;
-    private final Channel channel;
+    private int closeTimeout = 30 * 1000;
+    private Connection conn;
+    private Channel channel;
+    private ExecutorService executorService;
 
     public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
         super(endpoint);
-        this.conn = endpoint.connect(Executors.newSingleThreadExecutor());
-        this.channel = conn.createChannel();
     }
 
     @Override
@@ -46,8 +46,35 @@ public class RabbitMQProducer extends DefaultProducer {
         return (RabbitMQEndpoint) super.getEndpoint();
     }
 
-    public void shutdown() throws IOException {
-        conn.close();
+    @Override
+    protected void doStart() throws Exception {
+        this.executorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
 "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
+
+        log.trace("Creating connection...");
+        this.conn = getEndpoint().connect(executorService);
+        log.debug("Created connection: {}", conn);
+
+        log.trace("Creating channel...");
+        this.channel = conn.createChannel();
+        log.debug("Created channel: {}", channel);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (channel != null) {
+            log.debug("Closing channel: {}", channel);
+            channel.close();
+            channel = null;
+        }
+        if (conn != null) {
+            log.debug("Closing connection: {} with timeout: {} ms.", conn, 
closeTimeout);
+            conn.close(closeTimeout);
+            conn = null;
+        }
+        if (executorService != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
     }
 
     @Override
@@ -179,4 +206,12 @@ public class RabbitMQProducer extends DefaultProducer {
         }
         return null;
     }
+
+    public int getCloseTimeout() {
+        return closeTimeout;
+    }
+
+    public void setCloseTimeout(int closeTimeout) {
+        this.closeTimeout = closeTimeout;
+    }
 }

Reply via email to