This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 93ac39c  CAMEL-12229: camel-rabbitmq consumer should be more resilient 
on starting. There may be a situation where a connection is created but cannot 
be started, and then the reconnect logic would reconnect but not start the 
consumer. Now we have logic that calls the start method, and we leverage Camels 
ServiceSupport for lifecycle of start/stop. (#2491)
93ac39c is described below

commit 93ac39c3a9139661d763e99ccfa5673fe7ebfc10
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Aug 23 14:08:10 2018 +0200

    CAMEL-12229: camel-rabbitmq consumer should be more resilient on starting. 
There may be a situation where a connection is created but cannot be started, 
and then the reconnect logic would reconnect but not start the consumer. Now we 
have logic that calls the start method, and we leverage Camels ServiceSupport 
for lifecycle of start/stop. (#2491)
---
 .../camel/component/rabbitmq/RabbitConsumer.java   | 33 ++++++++++++----------
 .../camel/component/rabbitmq/RabbitMQConsumer.java | 25 ++++++++--------
 2 files changed, 32 insertions(+), 26 deletions(-)

diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index c916ec7..18da233 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -30,10 +30,12 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class RabbitConsumer implements com.rabbitmq.client.Consumer {
+class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consumer {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final RabbitMQConsumer consumer;
     private Channel channel;
@@ -167,21 +169,16 @@ class RabbitConsumer implements 
com.rabbitmq.client.Consumer {
         }
     }
 
-    /**
-     * Bind consumer to channel
-     */
-    public void start() throws IOException {
+    @Override
+    protected void doStart() throws Exception {
         if (channel == null) {
             throw new IOException("The RabbitMQ channel is not open");
         }
         tag = channel.basicConsume(consumer.getEndpoint().getQueue(), 
consumer.getEndpoint().isAutoAck(), "", false, 
consumer.getEndpoint().isExclusiveConsumer(), null, this);
     }
 
-    /**
-     * Unbind consumer from channel
-     */
-    public void stop() throws IOException, TimeoutException {
-        stopping = true;
+    @Override
+    protected void doStop() throws Exception {
         if (channel == null) {
             return;
         }
@@ -200,7 +197,6 @@ class RabbitConsumer implements 
com.rabbitmq.client.Consumer {
             log.error("Thread Interrupted!");
         } finally {
             lock.release();
-
         }
     }
 
@@ -250,7 +246,12 @@ class RabbitConsumer implements 
com.rabbitmq.client.Consumer {
         }
 
         this.consumer.getEndpoint().declareExchangeAndQueue(channel);
-        this.start();
+
+        try {
+            this.start();
+        } catch (Exception e) {
+            throw new IOException("Error starting consumer", e);
+        }
     }
 
     /**
@@ -263,11 +264,11 @@ class RabbitConsumer implements 
com.rabbitmq.client.Consumer {
         if (!sig.isInitiatedByApplication()) {
             // Something else closed the connection so reconnect
             boolean connected = false;
-            while (!connected && !stopping) {
+            while (!connected && !isStopping()) {
                 try {
                     reconnect();
                     connected = true;
-                } catch (IOException | TimeoutException e) {
+                } catch (Exception e) {
                     log.warn("Unable to obtain a RabbitMQ channel. Will try 
again. Caused by: " + e.getMessage() + ". Stacktrace logged at DEBUG logging 
level.");
                     // include stacktrace in DEBUG logging
                     log.debug(e.getMessage(), e);
@@ -297,8 +298,10 @@ class RabbitConsumer implements 
com.rabbitmq.client.Consumer {
      * If the RabbitMQ connection is good this returns without changing
      * anything. If the connection is down it will attempt to reconnect
      */
-    public void reconnect() throws IOException, TimeoutException {
+    public void reconnect() throws Exception {
         if (isChannelOpen()) {
+            // ensure we are started
+            start();
             // The connection is good, so nothing to do
             return;
         } else if (channel != null && !channel.isOpen() && 
isAutomaticRecoveryEnabled()) {
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index e3c7640..9aba524 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -29,6 +29,7 @@ import com.rabbitmq.client.Connection;
 import org.apache.camel.Processor;
 import org.apache.camel.Suspendable;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ServiceHelper;
 
 public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
     private ExecutorService executor;
@@ -67,8 +68,6 @@ public class RabbitMQConsumer extends DefaultConsumer 
implements Suspendable {
 
     /**
      * Returns the exiting open connection or opens a new one
-     * @throws IOException
-     * @throws TimeoutException
      */
     protected synchronized Connection getConnection() throws IOException, 
TimeoutException {
         if (this.conn == null) {
@@ -101,14 +100,18 @@ public class RabbitMQConsumer extends DefaultConsumer 
implements Suspendable {
      * Start the consumers (already created)
      */
     private void startConsumers() {
-
         // Try starting consumers (which will fail if RabbitMQ can't connect)
-        try {
-            for (RabbitConsumer consumer : this.consumers) {
-                consumer.start();
+        Throwable fail = null;
+        // attempt to start all consumers if possible
+        for (RabbitConsumer consumer : this.consumers) {
+            try {
+                ServiceHelper.startService(consumer);
+            } catch (Throwable e) {
+                fail = e;
             }
-        } catch (Exception e) {
-            log.info("Connection failed, will start background thread to 
retry!", e);
+        }
+        if (fail != null) {
+            log.info("Connection failed starting consumers, will start 
background thread to retry!", fail);
             reconnect();
         }
     }
@@ -141,9 +144,9 @@ public class RabbitMQConsumer extends DefaultConsumer 
implements Suspendable {
         }
         for (RabbitConsumer consumer : this.consumers) {
             try {
-                consumer.stop();
-            } catch (TimeoutException e) {
-                log.warn("Timeout occurred while stopping consumer. This 
exception is ignored", e);
+                ServiceHelper.stopAndShutdownService(consumer);
+            } catch (Exception e) {
+                log.warn("Error occurred while stopping consumer. This 
exception is ignored", e);
             }
         }
         this.consumers.clear();

Reply via email to