Adding ability to REQUEUE a message on RabbitMQ server instead of REJECTING message / sending it to DLQ.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/464c06ea Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/464c06ea Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/464c06ea Branch: refs/heads/master Commit: 464c06eac4ab7155e016ab72541525d0d2a2e77e Parents: d5b5964 Author: Andrew Austin <andrew.aus...@wgu.edu> Authored: Sat Feb 21 17:16:24 2015 -0700 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Feb 22 16:17:32 2015 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConstants.java | 1 + .../component/rabbitmq/RabbitMQConsumer.java | 21 ++-- .../rabbitmq/RabbitMQRequeueIntTest.java | 111 +++++++++++++++++++ 3 files changed, 126 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/464c06ea/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java index 383ac5d..b1f4a0c 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java @@ -34,6 +34,7 @@ public final class RabbitMQConstants { public static final String EXPIRATION = "rabbitmq.EXPIRATION"; public static final String TIMESTAMP = "rabbitmq.TIMESTAMP"; public static final String APP_ID = "rabbitmq.APP_ID"; + public static final String REQUEUE = "rabbitmq.REQUEUE"; public static final String RABBITMQ_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange"; public static final String RABBITMQ_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; http://git-wip-us.apache.org/repos/asf/camel/blob/464c06ea/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 3201887..73aa814 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -184,6 +184,13 @@ public class RabbitMQConsumer extends DefaultConsumer { Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body); mergeAmqpProperties(exchange, properties); + Message msg; + if (exchange.hasOut()) { + msg = exchange.getOut(); + } else { + msg = exchange.getIn(); + } + boolean sendReply = properties.getReplyTo() != null; if (sendReply && !exchange.getPattern().isOutCapable()) { exchange.setPattern(ExchangePattern.InOut); @@ -200,12 +207,6 @@ public class RabbitMQConsumer extends DefaultConsumer { if (!exchange.isFailed()) { // processing success if (sendReply && exchange.getPattern().isOutCapable()) { - Message msg; - if (exchange.hasOut()) { - msg = exchange.getOut(); - } else { - msg = exchange.getIn(); - } AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .headers(msg.getHeaders()) .correlationId(properties.getCorrelationId()) @@ -217,9 +218,15 @@ public class RabbitMQConsumer extends DefaultConsumer { channel.basicAck(deliveryTag, false); } } else { + boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); // processing failed, then reject and handle the exception if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) { - channel.basicReject(deliveryTag, false); + log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", deliveryTag, isRequeueHeaderSet); + if (isRequeueHeaderSet) { + channel.basicReject(deliveryTag, true); + } else { + channel.basicReject(deliveryTag, false); + } } if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); http://git-wip-us.apache.org/repos/asf/camel/blob/464c06ea/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java new file mode 100644 index 0000000..534f5b4 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java @@ -0,0 +1,111 @@ +package org.apache.camel.component.rabbitmq; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Integration test to confirm REQUEUE header causes message to be re-queued instead of sent to DLQ. + * + * Created by Andrew Austin on 2/21/15. + */ +public class RabbitMQRequeueIntTest extends CamelTestSupport { + public static final String ROUTING_KEY = "rk4"; + + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate directProducer; + + @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest" + + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY) + private Endpoint rabbitMQEndpoint; + + @EndpointInject(uri = "mock:producing") + private MockEndpoint producingMockEndpoint; + + @EndpointInject(uri = "mock:consuming") + private MockEndpoint consumingMockEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:rabbitMQ") + .id("producingRoute") + .log("Sending message") + .inOnly(rabbitMQEndpoint) + .to(producingMockEndpoint); + from(rabbitMQEndpoint) + .id("consumingRoute") + .log("Receiving message") + .inOnly(consumingMockEndpoint) + .choice() + .when(body().isEqualTo("requeue header false")) + .log("Setting REQUEUE flag to false") + .setHeader(RabbitMQConstants.REQUEUE, constant(false)) + .when(body().isEqualTo("requeue header true")) + .log("Setting REQUEUE flag to true") + .setHeader(RabbitMQConstants.REQUEUE, constant(true)) + .when(body().isEqualTo("non-boolean header")) + .log("Setting REQUEUE flag to non-boolean") + .setHeader(RabbitMQConstants.REQUEUE, constant(4l)) + .end() + .throwException(new Exception("Simulated exception")); + } + }; + } + + @Test + public void testNoRequeueHeaderCausesReject() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.expectedMessageCount(1); + + directProducer.sendBody("no requeue header"); + + Thread.sleep(100); + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } + + @Test + public void testNonBooleanRequeueHeaderCausesReject() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.expectedMessageCount(1); + + directProducer.sendBody("non-boolean header"); + + Thread.sleep(100); + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } + + @Test + public void testFalseRequeueHeaderCausesReject() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.expectedMessageCount(1); + + directProducer.sendBody("non-boolean header"); + + Thread.sleep(100); + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } + + @Test + public void testTrueRequeueHeaderCausesRequeue() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.setMinimumExpectedMessageCount(2); + + directProducer.sendBody("requeue header true"); + + Thread.sleep(100); + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } +}