Repository: camel Updated Branches: refs/heads/camel-2.16.x 302c91d1a -> 02fd57d85
CAMEL-9505: RabbitMQConsumer don't use Camel ExceptionHandler BEFORE requeing message Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/02fd57d8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/02fd57d8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/02fd57d8 Branch: refs/heads/camel-2.16.x Commit: 02fd57d8577b3f2c1b9a6007afad4b57e65ec093 Parents: 302c91d Author: Andrea Cosentino <anco...@gmail.com> Authored: Wed Jan 13 14:05:34 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed Jan 13 14:11:31 2016 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConsumer.java | 6 +- .../RabbitMQRequeueHandledExceptionIntTest.java | 81 ++++++++++++++++++++ .../rabbitmq/RabbitMQRequeueIntTest.java | 4 +- ...abbitMQRequeueUnhandledExceptionIntTest.java | 81 ++++++++++++++++++++ 4 files changed, 167 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/02fd57d8/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index b535915..3a5a4ea 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -253,6 +253,9 @@ public class RabbitMQConsumer extends DefaultConsumer { channel.basicAck(deliveryTag, false); } } else { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); // processing failed, then reject and handle the exception if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) { @@ -263,9 +266,6 @@ public class RabbitMQConsumer extends DefaultConsumer { channel.basicReject(deliveryTag, false); } } - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } } } http://git-wip-us.apache.org/repos/asf/camel/blob/02fd57d8/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueHandledExceptionIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueHandledExceptionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueHandledExceptionIntTest.java new file mode 100644 index 0000000..a6e1d73 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueHandledExceptionIntTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Integration test to confirm REQUEUE header causes message not to be re-queued when an handled exception occurs. + */ +public class RabbitMQRequeueHandledExceptionIntTest extends CamelTestSupport { + public static final String ROUTING_KEY = "rk4"; + + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate directProducer; + + @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?" + + "autoAck=false&queue=q4&routingKey=" + ROUTING_KEY) + private Endpoint rabbitMQEndpoint; + + @EndpointInject(uri = "mock:producing") + private MockEndpoint producingMockEndpoint; + + @EndpointInject(uri = "mock:consuming") + private MockEndpoint consumingMockEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:rabbitMQ") + .id("producingRoute") + .log("Sending message") + .inOnly(rabbitMQEndpoint) + .to(producingMockEndpoint); + + from(rabbitMQEndpoint) + .onException(Exception.class) + .handled(true) + .end() + .id("consumingRoute") + .log("Receiving message") + .inOnly(consumingMockEndpoint) + .throwException(new Exception("Simulated handled exception")); + } + }; + } + + @Test + public void testTrueRequeueHeaderWithHandleExceptionNotCausesRequeue() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.setMinimumExpectedMessageCount(1); + + directProducer.sendBodyAndHeader("Hello, World!", RabbitMQConstants.REQUEUE, true); + + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/02fd57d8/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java index 97798fc..e0c8d74 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java @@ -34,8 +34,8 @@ public class RabbitMQRequeueIntTest extends CamelTestSupport { @Produce(uri = "direct:rabbitMQ") protected ProducerTemplate directProducer; - @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest" - + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY) + @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?" + + "autoAck=false&queue=q4&routingKey=" + ROUTING_KEY) private Endpoint rabbitMQEndpoint; @EndpointInject(uri = "mock:producing") http://git-wip-us.apache.org/repos/asf/camel/blob/02fd57d8/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueUnhandledExceptionIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueUnhandledExceptionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueUnhandledExceptionIntTest.java new file mode 100644 index 0000000..4b7c6fa --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueUnhandledExceptionIntTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Integration test to confirm REQUEUE header causes message to be re-queued when an unhandled exception occurs. + */ +public class RabbitMQRequeueUnhandledExceptionIntTest extends CamelTestSupport { + public static final String ROUTING_KEY = "rk4"; + + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate directProducer; + + @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?" + + "autoAck=false&queue=q4&routingKey=" + ROUTING_KEY) + private Endpoint rabbitMQEndpoint; + + @EndpointInject(uri = "mock:producing") + private MockEndpoint producingMockEndpoint; + + @EndpointInject(uri = "mock:consuming") + private MockEndpoint consumingMockEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:rabbitMQ") + .id("producingRoute") + .log("Sending message") + .inOnly(rabbitMQEndpoint) + .to(producingMockEndpoint); + + from(rabbitMQEndpoint) + .onException(Exception.class) + .handled(false) + .end() + .id("consumingRoute") + .log("Receiving message") + .inOnly(consumingMockEndpoint) + .throwException(new Exception("Simulated unhandled exception")); + } + }; + } + + @Test + public void testTrueRequeueHeaderWithUnandleExceptionCausesRequeue() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.setMinimumExpectedMessageCount(2); + + directProducer.sendBodyAndHeader("Hello, World!", RabbitMQConstants.REQUEUE, true); + + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } +}