Repository: camel Updated Branches: refs/heads/camel-2.16.x 3d7051793 -> 556600393 refs/heads/master 0fb50fcf5 -> 1d0bc598b
RabbitMq consumer should be able to suspend and resume Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1d0bc598 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1d0bc598 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1d0bc598 Branch: refs/heads/master Commit: 1d0bc598b3c7e6b7ed7ffa06d6cdf7885ada010d Parents: 0fb50fc Author: Preben Asmussen <preben.asmus...@gmail.com> Authored: Mon Dec 28 15:16:06 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 30 10:53:21 2015 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConsumer.java | 43 ++++++---- .../rabbitmq/RabbitMQSupendResumeIntTest.java | 84 ++++++++++++++++++++ 2 files changed, 111 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1d0bc598/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index a71769e..eaf2b6c 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -36,7 +36,6 @@ import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultConsumer; - public class RabbitMQConsumer extends DefaultConsumer { private ExecutorService executor; private Connection conn; @@ -60,7 +59,7 @@ public class RabbitMQConsumer extends DefaultConsumer { @Override public RabbitMQEndpoint getEndpoint() { - return (RabbitMQEndpoint) super.getEndpoint(); + return (RabbitMQEndpoint)super.getEndpoint(); } /** @@ -81,8 +80,7 @@ public class RabbitMQConsumer extends DefaultConsumer { log.debug("Created channel: {}", channel); // setup the basicQos if (endpoint.isPrefetchEnabled()) { - channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), - endpoint.isPrefetchGlobal()); + channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal()); } return channel; } @@ -122,16 +120,25 @@ public class RabbitMQConsumer extends DefaultConsumer { startConsumers(); } catch (Exception e) { log.info("Connection failed, will start background thread to retry!", e); - // Open connection, and start message listener in background - Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval(); - final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L; - startConsumerCallable = new StartConsumerCallable(connectionRetryInterval); - executor.submit(startConsumerCallable); + reconnect(); } } + private void reconnect() { + // Open connection, and start message listener in background + Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval(); + final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L; + startConsumerCallable = new StartConsumerCallable(connectionRetryInterval); + executor.submit(startConsumerCallable); + } + + @Override + protected void doResume() throws Exception { + reconnect(); + } + /** - * If needed, close Connection and Channels + * If needed, close Connection and Channels */ private void closeConnectionAndChannel() throws IOException, TimeoutException { if (startConsumerCallable != null) { @@ -154,6 +161,11 @@ public class RabbitMQConsumer extends DefaultConsumer { } @Override + protected void doSuspend() throws Exception { + closeConnectionAndChannel(); + } + + @Override protected void doStop() throws Exception { closeConnectionAndChannel(); @@ -186,9 +198,7 @@ public class RabbitMQConsumer extends DefaultConsumer { } @Override - public void handleDelivery(String consumerTag, Envelope envelope, - AMQP.BasicProperties properties, byte[] body) throws IOException { - + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body); endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties); @@ -228,7 +238,8 @@ public class RabbitMQConsumer extends DefaultConsumer { channel.basicAck(deliveryTag, false); } } else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) { - // the inOut exchange failed so put the exception in the body and send back + // the inOut exchange failed so put the exception in the body + // and send back msg.setBody(exchange.getException()); exchange.setOut(msg); try { @@ -282,8 +293,8 @@ public class RabbitMQConsumer extends DefaultConsumer { } /** - * Task in charge of opening connection and adding listener when consumer is started - * and broker is not available. + * Task in charge of opening connection and adding listener when consumer is + * started and broker is not available. */ private class StartConsumerCallable implements Callable<Void> { private final long connectionRetryInterval; http://git-wip-us.apache.org/repos/asf/camel/blob/1d0bc598/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java new file mode 100644 index 0000000..fd269a8 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class RabbitMQSupendResumeIntTest extends CamelTestSupport { + private static final String EXCHANGE = "ex4"; + + @EndpointInject(uri = "mock:result") + private MockEndpoint resultEndpoint; + + @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q3&routingKey=rk3&autoDelete=false") + private Endpoint rabbitMQEndpoint; + + @Produce(uri = "direct:start") + private ProducerTemplate template; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:start").routeId("producer").log("sending ${body}").to(rabbitMQEndpoint); + from(rabbitMQEndpoint).routeId("consumer").log("got ${body}").to("mock:result"); + } + }; + } + + @Test + public void testSuspendedResume() throws Exception { + resultEndpoint.expectedMessageCount(1); + resultEndpoint.expectedBodiesReceived("hello"); + + template.sendBody("hello"); + + assertMockEndpointsSatisfied(); + + context.suspendRoute("consumer"); + + // sleep a bit to ensure its properly suspended + Thread.sleep(2000); + + resetMocks(); + resultEndpoint.expectedMessageCount(0); + + template.sendBody("Hello2"); + + assertMockEndpointsSatisfied(1, TimeUnit.SECONDS); + + resetMocks(); + resultEndpoint.expectedBodiesReceived("Hello2"); + resultEndpoint.expectedMessageCount(1); + + context.resumeRoute("consumer"); + + assertMockEndpointsSatisfied(); + } + +}