Repository: camel Updated Branches: refs/heads/camel-2.19.x 2c69684cb -> 7bcefc1a9 refs/heads/master 5b08f5034 -> 4e179e647
CAMEL-11791: Enhanced reconnection for rabbitmq consumer and producer (including queue/exchange deletion) Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c5443cf5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c5443cf5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c5443cf5 Branch: refs/heads/master Commit: c5443cf52f8662a7e19ff40bdde3c3ae8751566c Parents: 5b08f50 Author: Veiga Ortiz, HeÌctor <hector.veiga-or...@here.com> Authored: Mon Sep 25 14:47:18 2017 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Sep 26 08:28:50 2017 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitConsumer.java | 26 ++++++++++++++------ .../component/rabbitmq/RabbitMQConsumer.java | 12 ++++++--- .../component/rabbitmq/RabbitMQEndpoint.java | 2 +- .../rabbitmq/RabbitMQMessagePublisher.java | 1 + .../component/rabbitmq/RabbitMQProducer.java | 8 +++++- .../rabbitmq/pool/PoolableChannelFactory.java | 6 ++++- 6 files changed, 41 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java index 6c20b57..e96367c 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -237,8 +237,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { * the defined consumer tag (client- or server-generated) */ public void handleCancel(String consumerTag) throws IOException { - // no work to do - log.debug("Received cancel signal on the rabbitMQ channel"); + log.debug("Received cancel signal on the rabbitMQ channel."); + + try { + channel.basicCancel(tag); + } catch (Exception e) { + //no-op + } + + this.consumer.getEndpoint().declareExchangeAndQueue(channel); + this.start(); } /** @@ -287,12 +295,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { if (isChannelOpen()) { // The connection is good, so nothing to do return; + } else if (!isChannelOpen() && this.consumer.getEndpoint().getAutomaticRecoveryEnabled()) { + // Still need to wait for channel to re-open + throw new IOException("Waiting for channel to re-open."); + } else if (!this.consumer.getEndpoint().getAutomaticRecoveryEnabled()) { + log.info("Attempting to open a new rabbitMQ channel"); + Connection conn = consumer.getConnection(); + channel = openChannel(conn); + // Register the channel to the tag + start(); } - log.info("Attempting to open a new rabbitMQ channel"); - Connection conn = consumer.getConnection(); - channel = openChannel(conn); - // Register the channel to the tag - start(); } private boolean isChannelOpen() { http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 9c02cb7..95a6609 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -71,12 +71,16 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { * @throws TimeoutException */ protected synchronized Connection getConnection() throws IOException, TimeoutException { - if (this.conn != null && this.conn.isOpen()) { + if (this.conn == null) { + openConnection(); + return this.conn; + } else if (!this.conn.isOpen() && this.endpoint.getAutomaticRecoveryEnabled()) { + return this.conn; + } else { + log.debug("The existing connection is closed"); + openConnection(); return this.conn; } - log.debug("The existing connection is closed"); - openConnection(); - return this.conn; } http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index e3cb28d..c3cd8bf 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -117,7 +117,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { @UriParam(label = "advanced") private Boolean automaticRecoveryEnabled; @UriParam(label = "advanced") - private Integer networkRecoveryInterval; + private Integer networkRecoveryInterval = 5000; @UriParam(label = "advanced") private Boolean topologyRecoveryEnabled; @UriParam(label = "consumer") http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java index 85e657f..a61d470 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java @@ -24,6 +24,7 @@ import java.io.Serializable; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ReturnListener; http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 3e45c15..759039d 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -79,9 +79,11 @@ public class RabbitMQProducer extends DefaultAsyncProducer { channel = channelPool.borrowObject(); } if (!channel.isOpen()) { - log.warn("Got a closed channel from the pool"); + log.warn("Got a closed channel from the pool. Invalidating and borrowing a new one from the pool."); + channelPool.invalidateObject(channel); // Reconnect if another thread hasn't yet checkConnectionAndChannelPool(); + attemptDeclaration(); channel = channelPool.borrowObject(); } try { @@ -103,6 +105,10 @@ public class RabbitMQProducer extends DefaultAsyncProducer { log.trace("Creating channel pool..."); channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getEndpoint().getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getEndpoint().getChannelPoolMaxWait()); + attemptDeclaration(); + } + + private synchronized void attemptDeclaration() throws Exception { if (getEndpoint().isDeclare()) { execute(new ChannelCallback<Void>() { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/c5443cf5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java index b10201f..ea0e619 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java @@ -41,7 +41,11 @@ public class PoolableChannelFactory implements PoolableObjectFactory<Channel> { @Override public void destroyObject(Channel t) throws Exception { - t.close(); + try { + t.close(); + } catch (Exception e) { + //no-op + } } @Override