Reconnect RabbitMQ Producer/Consumer after broker was unavailable
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1191e19f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1191e19f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1191e19f Branch: refs/heads/master Commit: 1191e19fc194b545931530011511a55a8d6d6431 Parents: 0762be7 Author: Gerald Quintana <gerald.quint...@zenika.com> Authored: Mon May 5 15:47:01 2014 +0200 Committer: Gerald Quintana <gerald.quint...@zenika.com> Committed: Mon May 5 15:47:01 2014 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConsumer.java | 112 +++++++++++++++---- .../component/rabbitmq/RabbitMQEndpoint.java | 22 ++++ .../component/rabbitmq/RabbitMQProducer.java | 33 +++++- .../rabbitmq/RabbitMQEndpointTest.java | 30 +++-- .../rabbitmq/RabbitMQReConnectionIntTest.java | 77 +++++++++++++ 5 files changed, 232 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 501f2a1..0bdc93b 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -17,8 +17,9 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; -import java.util.HashMap; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; @@ -36,6 +37,10 @@ public class RabbitMQConsumer extends DefaultConsumer { private int closeTimeout = 30 * 1000; private final RabbitMQEndpoint endpoint; + /** + * Task in charge of starting consumer + */ + private StartConsumerCallable startConsumerCallable; public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -43,37 +48,56 @@ public class RabbitMQConsumer extends DefaultConsumer { } @Override - protected void doStart() throws Exception { - executor = endpoint.createExecutor(); - log.debug("Using executor {}", executor); - - conn = endpoint.connect(executor); - log.debug("Using conn {}", conn); - - channel = conn.createChannel(); - log.debug("Using channel {}", channel); + public RabbitMQEndpoint getEndpoint() { + return (RabbitMQEndpoint) super.getEndpoint(); + } - channel.exchangeDeclare(endpoint.getExchangeName(), - endpoint.getExchangeType(), - endpoint.isDurable(), - endpoint.isAutoDelete(), - new HashMap<String, Object>()); + /** + * Open connection and channel + */ + private void openConnectionAndChannel() throws IOException { + log.trace("Creating connection..."); + this.conn = getEndpoint().connect(executor); + log.debug("Created connection: {}", conn); + + log.trace("Creating channel..."); + this.channel = conn.createChannel(); + log.debug("Created channel: {}", channel); + } - // need to make sure the queueDeclare is same with the exchange declare - channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, - endpoint.isAutoDelete(), null); - channel.queueBind( - endpoint.getQueue(), - endpoint.getExchangeName(), - endpoint.getRoutingKey() == null ? "" : endpoint - .getRoutingKey()); + /** + * If needed, create Exchange and Queue, then add message listener + */ + private void addConsumer() throws IOException { + getEndpoint().declareExchangeAndQueue(channel); channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel)); } @Override - protected void doStop() throws Exception { + protected void doStart() throws Exception { + executor = endpoint.createExecutor(); + log.debug("Using executor {}", executor); + try { + openConnectionAndChannel(); + addConsumer(); + } catch (Exception e) { + // Open connection, and start message listener in background + Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval(); + final long connectionRetryInterval= networkRecoveryInterval!=null && networkRecoveryInterval >0? networkRecoveryInterval :100L; + startConsumerCallable=new StartConsumerCallable(connectionRetryInterval); + executor.submit(startConsumerCallable); + } + } + + /** + * If needed, close Connection and Channel + */ + private void closeConnectionAndChannel() throws IOException { + if (startConsumerCallable!=null) { + startConsumerCallable.stop(); + } if (channel != null) { log.debug("Closing channel: {}", channel); channel.close(); @@ -84,6 +108,12 @@ public class RabbitMQConsumer extends DefaultConsumer { conn.close(closeTimeout); conn = null; } + } + + @Override + protected void doStop() throws Exception { + closeConnectionAndChannel(); + if (executor != null) { if (endpoint != null && endpoint.getCamelContext() != null) { endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); @@ -178,4 +208,38 @@ public class RabbitMQConsumer extends DefaultConsumer { } + /** + * Task in charge of opening connection and adding listener when consumer is started + * and broker is not avaiblable. + */ + private class StartConsumerCallable implements Callable<Void> { + private final long connectionRetryInterval; + private final AtomicBoolean running=new AtomicBoolean(true); + public StartConsumerCallable(long connectionRetryInterval) { + this.connectionRetryInterval = connectionRetryInterval; + } + public void stop() { + running.set(false); + RabbitMQConsumer.this.startConsumerCallable=null; + } + @Override + public Void call() throws Exception { + boolean connectionFailed=true; + // Reconnection loop + while (running.get() && connectionFailed) { + try { + openConnectionAndChannel(); + connectionFailed=false; + } catch (Exception e) { + log.debug("Connection failed, will retry in "+connectionRetryInterval+"ms", e); + Thread.sleep(connectionRetryInterval); + } + } + if (!connectionFailed) { + addConsumer(); + } + stop(); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 646a633..7f87f53 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -29,6 +30,7 @@ import javax.net.ssl.TrustManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; +import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; @@ -126,6 +128,26 @@ public class RabbitMQEndpoint extends DefaultEndpoint { } } + /** + * If needed, declare Exchange, declare Queue and bind them with Routing Key + */ + public void declareExchangeAndQueue(Channel channel) throws IOException { + channel.exchangeDeclare(getExchangeName(), + getExchangeType(), + isDurable(), + isAutoDelete(), + new HashMap<String, Object>()); + if (getQueue()!=null) { + // need to make sure the queueDeclare is same with the exchange declare + channel.queueDeclare(getQueue(), isDurable(), false, + isAutoDelete(), null); + channel.queueBind( + getQueue(), + getExchangeName(), + getRoutingKey() == null ? "" : getRoutingKey()); + } + } + private ConnectionFactory getOrCreateConnectionFactory() { if (connectionFactory == null) { ConnectionFactory factory = new ConnectionFactory(); http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 7763423..03177e2 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -45,11 +45,10 @@ public class RabbitMQProducer extends DefaultProducer { public RabbitMQEndpoint getEndpoint() { return (RabbitMQEndpoint) super.getEndpoint(); } - - @Override - protected void doStart() throws Exception { - this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]"); - + /** + * Open connection and channel + */ + private void openConnectionAndChannel() throws IOException { log.trace("Creating connection..."); this.conn = getEndpoint().connect(executorService); log.debug("Created connection: {}", conn); @@ -60,7 +59,20 @@ public class RabbitMQProducer extends DefaultProducer { } @Override - protected void doStop() throws Exception { + protected void doStart() throws Exception { + this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]"); + + try { + openConnectionAndChannel(); + } catch (IOException e) { + log.warn("Failed to create connection", e); + } + } + + /** + * If needed, close Connection and Channel + */ + private void closeConnectionAndChannel() throws IOException { if (channel != null) { log.debug("Closing channel: {}", channel); channel.close(); @@ -71,6 +83,11 @@ public class RabbitMQProducer extends DefaultProducer { conn.close(closeTimeout); conn = null; } + } + + @Override + protected void doStop() throws Exception { + closeConnectionAndChannel(); if (executorService != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService); executorService = null; @@ -95,6 +112,10 @@ public class RabbitMQProducer extends DefaultProducer { byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class); AMQP.BasicProperties.Builder properties = buildProperties(exchange); + if (channel==null) { + // Open connection and channel lazily + openConnectionAndChannel(); + } channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes); } http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 11b3675..2db5005 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.rabbitmq; +import java.io.IOException; import java.math.BigDecimal; import java.util.Date; import java.util.HashMap; @@ -26,7 +27,6 @@ import java.util.concurrent.ThreadPoolExecutor; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; -import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.impl.LongStringHelper; @@ -132,7 +132,13 @@ public class RabbitMQEndpointTest extends CamelTestSupport { } private ConnectionFactory createConnectionFactory(String uri) { - RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class); + RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class); + try { + endpoint.connect(Executors.newSingleThreadExecutor()); + } catch(IOException ioExc) { + // Doesn't matter if RabbitMQ is not available + log.debug("RabbitMQ not available", ioExc); + } return endpoint.getConnectionFactory(); } @@ -158,16 +164,16 @@ public class RabbitMQEndpointTest extends CamelTestSupport { @Test public void testCreateConnectionFactoryCustom() throws Exception { ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange" - + "?username=userxxx" - + "&password=passxxx" - + "&connectionTimeout=123" - + "&requestedChannelMax=456" - + "&requestedFrameMax=789" - + "&requestedHeartbeat=987" - + "&sslProtocol=true" - + "&automaticRecoveryEnabled=true" - + "&networkRecoveryInterval=654" - + "&topologyRecoveryEnabled=false"); + + "?username=userxxx" + + "&password=passxxx" + + "&connectionTimeout=123" + + "&requestedChannelMax=456" + + "&requestedFrameMax=789" + + "&requestedHeartbeat=987" + + "&sslProtocol=true" + + "&automaticRecoveryEnabled=true" + + "&networkRecoveryInterval=654" + + "&topologyRecoveryEnabled=false"); assertEquals("localhost", connectionFactory.getHost()); assertEquals(1234, connectionFactory.getPort()); http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java new file mode 100644 index 0000000..6a63948 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java @@ -0,0 +1,77 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.AlreadyClosedException; +import org.apache.camel.*; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import java.net.ConnectException; +import java.util.concurrent.TimeUnit; + +/** + * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker + * is not avaibable. + * <ul> + * <li>Stop the broker</li> + * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li> + * <li>Start the broker: the producer sends messages, and the consumer receives messages</li> + * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li> + * <li>Start the broker: the producer sends messages, and the consumer receives messages</li> + * </ul> + */ +public class RabbitMQReConnectionIntTest extends CamelTestSupport { + private static final String EXCHANGE = "ex3"; + + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate directProducer; + + @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" + + "&queue=q3&routingKey=rk3"+ + "&automaticRecoveryEnabled=true" + + "&requestedHeartbeat=1000" + + "&connectionTimeout=5000") + private Endpoint rabbitMQEndpoint; + + @EndpointInject(uri = "mock:result") + private MockEndpoint mockEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:rabbitMQ") + .id("producingRoute") + .onException(AlreadyClosedException.class, ConnectException.class) + .maximumRedeliveries(10) + .redeliveryDelay(500L) + .end() + .log("Sending message") + .inOnly(rabbitMQEndpoint); + from(rabbitMQEndpoint) + .id("consumingRoute") + .log("Receiving message") + .to(mockEndpoint); + } + }; + } + @Test + public void testSendEndReceive() throws Exception { + int nbMessages=100; + int failedMessages=0; + for(int i=0;i<nbMessages;i++) { + try { + directProducer.sendBodyAndHeader("Message #"+i, RabbitMQConstants.ROUTING_KEY, "rk3"); + } catch (CamelExecutionException e) { + log.debug("Can not send message", e); + failedMessages++; + } + Thread.sleep(500L); + } + mockEndpoint.assertExchangeReceived(nbMessages-failedMessages); + assertMockEndpointsSatisfied(5, TimeUnit.SECONDS); + } +}