Adding ability to REQUEUE a message on RabbitMQ server instead of REJECTING 
message / sending it to DLQ.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/464c06ea
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/464c06ea
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/464c06ea

Branch: refs/heads/master
Commit: 464c06eac4ab7155e016ab72541525d0d2a2e77e
Parents: d5b5964
Author: Andrew Austin <andrew.aus...@wgu.edu>
Authored: Sat Feb 21 17:16:24 2015 -0700
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Feb 22 16:17:32 2015 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConstants.java   |   1 +
 .../component/rabbitmq/RabbitMQConsumer.java    |  21 ++--
 .../rabbitmq/RabbitMQRequeueIntTest.java        | 111 +++++++++++++++++++
 3 files changed, 126 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/464c06ea/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
index 383ac5d..b1f4a0c 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -34,6 +34,7 @@ public final class RabbitMQConstants {
     public static final String EXPIRATION = "rabbitmq.EXPIRATION";
     public static final String TIMESTAMP = "rabbitmq.TIMESTAMP";
     public static final String APP_ID = "rabbitmq.APP_ID";
+    public static final String REQUEUE = "rabbitmq.REQUEUE";
     public static final String RABBITMQ_DEAD_LETTER_EXCHANGE = 
"x-dead-letter-exchange";
     public static final String RABBITMQ_DEAD_LETTER_ROUTING_KEY = 
"x-dead-letter-routing-key";
     

http://git-wip-us.apache.org/repos/asf/camel/blob/464c06ea/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 3201887..73aa814 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -184,6 +184,13 @@ public class RabbitMQConsumer extends DefaultConsumer {
             Exchange exchange = 
consumer.endpoint.createRabbitExchange(envelope, properties, body);
             mergeAmqpProperties(exchange, properties);
 
+            Message msg;
+            if (exchange.hasOut()) {
+                msg = exchange.getOut();
+            } else {
+                msg = exchange.getIn();
+            }
+
             boolean sendReply = properties.getReplyTo() != null;
             if (sendReply && !exchange.getPattern().isOutCapable()) {
                 exchange.setPattern(ExchangePattern.InOut);
@@ -200,12 +207,6 @@ public class RabbitMQConsumer extends DefaultConsumer {
             if (!exchange.isFailed()) {
                 // processing success
                 if (sendReply && exchange.getPattern().isOutCapable()) {
-                    Message msg;
-                    if (exchange.hasOut()) {
-                        msg = exchange.getOut();
-                    } else {
-                        msg = exchange.getIn();
-                    }
                     AMQP.BasicProperties replyProps = new 
AMQP.BasicProperties.Builder()
                             .headers(msg.getHeaders())
                             .correlationId(properties.getCorrelationId())
@@ -217,9 +218,15 @@ public class RabbitMQConsumer extends DefaultConsumer {
                     channel.basicAck(deliveryTag, false);
                 }
             } else {
+                boolean isRequeueHeaderSet = 
msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
                 // processing failed, then reject and handle the exception
                 if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
-                    channel.basicReject(deliveryTag, false);
+                    log.trace("Rejecting receipt [delivery_tag={}] with 
requeue={}", deliveryTag, isRequeueHeaderSet);
+                    if (isRequeueHeaderSet) {
+                        channel.basicReject(deliveryTag, true);
+                    } else {
+                        channel.basicReject(deliveryTag, false);
+                    }
                 }
                 if (exchange.getException() != null) {
                     getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());

http://git-wip-us.apache.org/repos/asf/camel/blob/464c06ea/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
new file mode 100644
index 0000000..534f5b4
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
@@ -0,0 +1,111 @@
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Integration test to confirm REQUEUE header causes message to be re-queued 
instead of sent to DLQ.
+ *
+ * Created by Andrew Austin on 2/21/15.
+ */
+public class RabbitMQRequeueIntTest extends CamelTestSupport {
+    public static final String ROUTING_KEY = "rk4";
+
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = 
"rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
+            + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "mock:producing")
+    private MockEndpoint producingMockEndpoint;
+
+    @EndpointInject(uri = "mock:consuming")
+    private MockEndpoint consumingMockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .log("Sending message")
+                        .inOnly(rabbitMQEndpoint)
+                        .to(producingMockEndpoint);
+                from(rabbitMQEndpoint)
+                        .id("consumingRoute")
+                        .log("Receiving message")
+                        .inOnly(consumingMockEndpoint)
+                        .choice()
+                        .when(body().isEqualTo("requeue header false"))
+                            .log("Setting REQUEUE flag to false")
+                            .setHeader(RabbitMQConstants.REQUEUE, 
constant(false))
+                        .when(body().isEqualTo("requeue header true"))
+                            .log("Setting REQUEUE flag to true")
+                            .setHeader(RabbitMQConstants.REQUEUE, 
constant(true))
+                        .when(body().isEqualTo("non-boolean header"))
+                                .log("Setting REQUEUE flag to non-boolean")
+                                .setHeader(RabbitMQConstants.REQUEUE, 
constant(4l))
+                        .end()
+                        .throwException(new Exception("Simulated exception"));
+            }
+        };
+    }
+
+    @Test
+    public void testNoRequeueHeaderCausesReject() throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.expectedMessageCount(1);
+
+        directProducer.sendBody("no requeue header");
+
+        Thread.sleep(100);
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testNonBooleanRequeueHeaderCausesReject() throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.expectedMessageCount(1);
+
+        directProducer.sendBody("non-boolean header");
+
+        Thread.sleep(100);
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testFalseRequeueHeaderCausesReject() throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.expectedMessageCount(1);
+
+        directProducer.sendBody("non-boolean header");
+
+        Thread.sleep(100);
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testTrueRequeueHeaderCausesRequeue() throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.setMinimumExpectedMessageCount(2);
+
+        directProducer.sendBody("requeue header true");
+
+        Thread.sleep(100);
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+}

Reply via email to