Repository: camel
Updated Branches:
  refs/heads/master d83fbc581 -> 0832667f2


CAMEL-9505: RabbitMQConsumer don't use Camel ExceptionHandler BEFORE requeing 
message


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0832667f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0832667f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0832667f

Branch: refs/heads/master
Commit: 0832667f2c43aee8f22606ea0ef672e1ec98a1ab
Parents: d83fbc5
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Wed Jan 13 14:05:34 2016 +0100
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Wed Jan 13 14:09:03 2016 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    |  6 +-
 .../RabbitMQRequeueHandledExceptionIntTest.java | 81 ++++++++++++++++++++
 .../rabbitmq/RabbitMQRequeueIntTest.java        |  4 +-
 ...abbitMQRequeueUnhandledExceptionIntTest.java | 81 ++++++++++++++++++++
 4 files changed, 167 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0832667f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index eaf2b6c..c3569e7 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -253,6 +253,9 @@ public class RabbitMQConsumer extends DefaultConsumer {
                     channel.basicAck(deliveryTag, false);
                 }
             } else {
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
+                }
                 boolean isRequeueHeaderSet = 
msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
                 // processing failed, then reject and handle the exception
                 if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
@@ -263,9 +266,6 @@ public class RabbitMQConsumer extends DefaultConsumer {
                         channel.basicReject(deliveryTag, false);
                     }
                 }
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0832667f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueHandledExceptionIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueHandledExceptionIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueHandledExceptionIntTest.java
new file mode 100644
index 0000000..a6e1d73
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueHandledExceptionIntTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Integration test to confirm REQUEUE header causes message not to be 
re-queued when an handled exception occurs.
+ */
+public class RabbitMQRequeueHandledExceptionIntTest extends CamelTestSupport {
+    public static final String ROUTING_KEY = "rk4";
+
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?"
+            + "autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "mock:producing")
+    private MockEndpoint producingMockEndpoint;
+
+    @EndpointInject(uri = "mock:consuming")
+    private MockEndpoint consumingMockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .log("Sending message")
+                        .inOnly(rabbitMQEndpoint)
+                        .to(producingMockEndpoint);
+
+                from(rabbitMQEndpoint)
+                        .onException(Exception.class)
+                        .handled(true)
+                        .end()
+                        .id("consumingRoute")
+                        .log("Receiving message")
+                        .inOnly(consumingMockEndpoint)
+                        .throwException(new Exception("Simulated handled 
exception"));
+            }
+        };
+    }
+
+    @Test
+    public void testTrueRequeueHeaderWithHandleExceptionNotCausesRequeue() 
throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.setMinimumExpectedMessageCount(1);
+
+        directProducer.sendBodyAndHeader("Hello, World!", 
RabbitMQConstants.REQUEUE, true);
+
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0832667f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
index 97798fc..e0c8d74 100644
--- 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
@@ -34,8 +34,8 @@ public class RabbitMQRequeueIntTest extends CamelTestSupport {
     @Produce(uri = "direct:rabbitMQ")
     protected ProducerTemplate directProducer;
 
-    @EndpointInject(uri = 
"rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
-            + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
+    @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?"
+            + "autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
     private Endpoint rabbitMQEndpoint;
 
     @EndpointInject(uri = "mock:producing")

http://git-wip-us.apache.org/repos/asf/camel/blob/0832667f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueUnhandledExceptionIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueUnhandledExceptionIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueUnhandledExceptionIntTest.java
new file mode 100644
index 0000000..4b7c6fa
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueUnhandledExceptionIntTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Integration test to confirm REQUEUE header causes message to be re-queued 
when an unhandled exception occurs.
+ */
+public class RabbitMQRequeueUnhandledExceptionIntTest extends CamelTestSupport 
{
+    public static final String ROUTING_KEY = "rk4";
+
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?"
+            + "autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "mock:producing")
+    private MockEndpoint producingMockEndpoint;
+
+    @EndpointInject(uri = "mock:consuming")
+    private MockEndpoint consumingMockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .log("Sending message")
+                        .inOnly(rabbitMQEndpoint)
+                        .to(producingMockEndpoint);
+
+                from(rabbitMQEndpoint)
+                        .onException(Exception.class)
+                        .handled(false)
+                        .end()
+                        .id("consumingRoute")
+                        .log("Receiving message")
+                        .inOnly(consumingMockEndpoint)
+                        .throwException(new Exception("Simulated unhandled 
exception"));
+            }
+        };
+    }
+
+    @Test
+    public void testTrueRequeueHeaderWithUnandleExceptionCausesRequeue() 
throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.setMinimumExpectedMessageCount(2);
+
+        directProducer.sendBodyAndHeader("Hello, World!", 
RabbitMQConstants.REQUEUE, true);
+
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+}

Reply via email to