This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 93ac39c CAMEL-12229: camel-rabbitmq consumer should be more resilient on starting. There may be a situation where a connection is created but cannot be started, and then the reconnect logic would reconnect but not start the consumer. Now we have logic that calls the start method, and we leverage Camels ServiceSupport for lifecycle of start/stop. (#2491) 93ac39c is described below commit 93ac39c3a9139661d763e99ccfa5673fe7ebfc10 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Aug 23 14:08:10 2018 +0200 CAMEL-12229: camel-rabbitmq consumer should be more resilient on starting. There may be a situation where a connection is created but cannot be started, and then the reconnect logic would reconnect but not start the consumer. Now we have logic that calls the start method, and we leverage Camels ServiceSupport for lifecycle of start/stop. (#2491) --- .../camel/component/rabbitmq/RabbitConsumer.java | 33 ++++++++++++---------- .../camel/component/rabbitmq/RabbitMQConsumer.java | 25 ++++++++-------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java index c916ec7..18da233 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -30,10 +30,12 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class RabbitConsumer implements com.rabbitmq.client.Consumer { +class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consumer { private final Logger log = LoggerFactory.getLogger(getClass()); private final RabbitMQConsumer consumer; private Channel channel; @@ -167,21 +169,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { } } - /** - * Bind consumer to channel - */ - public void start() throws IOException { + @Override + protected void doStart() throws Exception { if (channel == null) { throw new IOException("The RabbitMQ channel is not open"); } tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), "", false, consumer.getEndpoint().isExclusiveConsumer(), null, this); } - /** - * Unbind consumer from channel - */ - public void stop() throws IOException, TimeoutException { - stopping = true; + @Override + protected void doStop() throws Exception { if (channel == null) { return; } @@ -200,7 +197,6 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { log.error("Thread Interrupted!"); } finally { lock.release(); - } } @@ -250,7 +246,12 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { } this.consumer.getEndpoint().declareExchangeAndQueue(channel); - this.start(); + + try { + this.start(); + } catch (Exception e) { + throw new IOException("Error starting consumer", e); + } } /** @@ -263,11 +264,11 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { if (!sig.isInitiatedByApplication()) { // Something else closed the connection so reconnect boolean connected = false; - while (!connected && !stopping) { + while (!connected && !isStopping()) { try { reconnect(); connected = true; - } catch (IOException | TimeoutException e) { + } catch (Exception e) { log.warn("Unable to obtain a RabbitMQ channel. Will try again. Caused by: " + e.getMessage() + ". Stacktrace logged at DEBUG logging level."); // include stacktrace in DEBUG logging log.debug(e.getMessage(), e); @@ -297,8 +298,10 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { * If the RabbitMQ connection is good this returns without changing * anything. If the connection is down it will attempt to reconnect */ - public void reconnect() throws IOException, TimeoutException { + public void reconnect() throws Exception { if (isChannelOpen()) { + // ensure we are started + start(); // The connection is good, so nothing to do return; } else if (channel != null && !channel.isOpen() && isAutomaticRecoveryEnabled()) { diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index e3c7640..9aba524 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -29,6 +29,7 @@ import com.rabbitmq.client.Connection; import org.apache.camel.Processor; import org.apache.camel.Suspendable; import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.util.ServiceHelper; public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { private ExecutorService executor; @@ -67,8 +68,6 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { /** * Returns the exiting open connection or opens a new one - * @throws IOException - * @throws TimeoutException */ protected synchronized Connection getConnection() throws IOException, TimeoutException { if (this.conn == null) { @@ -101,14 +100,18 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { * Start the consumers (already created) */ private void startConsumers() { - // Try starting consumers (which will fail if RabbitMQ can't connect) - try { - for (RabbitConsumer consumer : this.consumers) { - consumer.start(); + Throwable fail = null; + // attempt to start all consumers if possible + for (RabbitConsumer consumer : this.consumers) { + try { + ServiceHelper.startService(consumer); + } catch (Throwable e) { + fail = e; } - } catch (Exception e) { - log.info("Connection failed, will start background thread to retry!", e); + } + if (fail != null) { + log.info("Connection failed starting consumers, will start background thread to retry!", fail); reconnect(); } } @@ -141,9 +144,9 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { } for (RabbitConsumer consumer : this.consumers) { try { - consumer.stop(); - } catch (TimeoutException e) { - log.warn("Timeout occurred while stopping consumer. This exception is ignored", e); + ServiceHelper.stopAndShutdownService(consumer); + } catch (Exception e) { + log.warn("Error occurred while stopping consumer. This exception is ignored", e); } } this.consumers.clear();