Repository: camel
Updated Branches:
  refs/heads/camel-2.19.x 2c69684cb -> 7bcefc1a9
  refs/heads/master 5b08f5034 -> 4e179e647


CAMEL-11791: Enhanced reconnection for rabbitmq consumer and producer 
(including queue/exchange deletion)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c5443cf5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c5443cf5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c5443cf5

Branch: refs/heads/master
Commit: c5443cf52f8662a7e19ff40bdde3c3ae8751566c
Parents: 5b08f50
Author: Veiga Ortiz, Héctor <hector.veiga-or...@here.com>
Authored: Mon Sep 25 14:47:18 2017 -0400
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Sep 26 08:28:50 2017 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitConsumer.java      | 26 ++++++++++++++------
 .../component/rabbitmq/RabbitMQConsumer.java    | 12 ++++++---
 .../component/rabbitmq/RabbitMQEndpoint.java    |  2 +-
 .../rabbitmq/RabbitMQMessagePublisher.java      |  1 +
 .../component/rabbitmq/RabbitMQProducer.java    |  8 +++++-
 .../rabbitmq/pool/PoolableChannelFactory.java   |  6 ++++-
 6 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 6c20b57..e96367c 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -237,8 +237,16 @@ class RabbitConsumer implements 
com.rabbitmq.client.Consumer {
      *            the defined consumer tag (client- or server-generated)
      */
     public void handleCancel(String consumerTag) throws IOException {
-        // no work to do
-        log.debug("Received cancel signal on the rabbitMQ channel");
+        log.debug("Received cancel signal on the rabbitMQ channel.");
+
+        try {
+            channel.basicCancel(tag);
+        } catch (Exception e) {
+            //no-op
+        }
+
+        this.consumer.getEndpoint().declareExchangeAndQueue(channel);
+        this.start();
     }
 
     /**
@@ -287,12 +295,16 @@ class RabbitConsumer implements 
com.rabbitmq.client.Consumer {
         if (isChannelOpen()) {
             // The connection is good, so nothing to do
             return;
+        } else if (!isChannelOpen() && 
this.consumer.getEndpoint().getAutomaticRecoveryEnabled()) {
+            // Still need to wait for channel to re-open
+            throw new IOException("Waiting for channel to re-open.");
+        } else if (!this.consumer.getEndpoint().getAutomaticRecoveryEnabled()) 
{
+            log.info("Attempting to open a new rabbitMQ channel");
+            Connection conn = consumer.getConnection();
+            channel = openChannel(conn);
+            // Register the channel to the tag
+            start();
         }
-        log.info("Attempting to open a new rabbitMQ channel");
-        Connection conn = consumer.getConnection();
-        channel = openChannel(conn);
-        // Register the channel to the tag
-        start();
     }
 
     private boolean isChannelOpen() {

http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 9c02cb7..95a6609 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -71,12 +71,16 @@ public class RabbitMQConsumer extends DefaultConsumer 
implements Suspendable {
      * @throws TimeoutException
      */
     protected synchronized Connection getConnection() throws IOException, 
TimeoutException {
-        if (this.conn != null && this.conn.isOpen()) {
+        if (this.conn == null) {
+            openConnection();
+            return this.conn;
+        } else if (!this.conn.isOpen() && 
this.endpoint.getAutomaticRecoveryEnabled()) {
+            return this.conn;
+        } else {
+            log.debug("The existing connection is closed");
+            openConnection();
             return this.conn;
         }
-        log.debug("The existing connection is closed");
-        openConnection();
-        return this.conn;
     }
 
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index e3cb28d..c3cd8bf 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -117,7 +117,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     @UriParam(label = "advanced")
     private Boolean automaticRecoveryEnabled;
     @UriParam(label = "advanced")
-    private Integer networkRecoveryInterval;
+    private Integer networkRecoveryInterval = 5000;
     @UriParam(label = "advanced")
     private Boolean topologyRecoveryEnabled;
     @UriParam(label = "consumer")

http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index 85e657f..a61d470 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AlreadyClosedException;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.ReturnListener;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 3e45c15..759039d 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -79,9 +79,11 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
             channel = channelPool.borrowObject();
         }
         if (!channel.isOpen()) {
-            log.warn("Got a closed channel from the pool");
+            log.warn("Got a closed channel from the pool. Invalidating and 
borrowing a new one from the pool.");
+            channelPool.invalidateObject(channel);
             // Reconnect if another thread hasn't yet
             checkConnectionAndChannelPool();
+            attemptDeclaration();
             channel = channelPool.borrowObject();
         }
         try {
@@ -103,6 +105,10 @@ public class RabbitMQProducer extends DefaultAsyncProducer 
{
         log.trace("Creating channel pool...");
         channelPool = new GenericObjectPool<Channel>(new 
PoolableChannelFactory(this.conn), getEndpoint().getChannelPoolMaxSize(),
                 GenericObjectPool.WHEN_EXHAUSTED_BLOCK, 
getEndpoint().getChannelPoolMaxWait());
+        attemptDeclaration();
+    }
+
+    private synchronized void attemptDeclaration() throws Exception {
         if (getEndpoint().isDeclare()) {
             execute(new ChannelCallback<Void>() {
                 @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
index b10201f..ea0e619 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -41,7 +41,11 @@ public class PoolableChannelFactory implements 
PoolableObjectFactory<Channel> {
 
     @Override
     public void destroyObject(Channel t) throws Exception {
-        t.close();
+        try {
+            t.close();
+        } catch (Exception e) {
+            //no-op
+        }
     }
 
     @Override

Reply via email to