Repository: camel Updated Branches: refs/heads/master 753100b4c -> 664637fce
CAMEL-10239: Provide implementation for publisher acknowledgement together with basic.return Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff96d5b2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff96d5b2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff96d5b2 Branch: refs/heads/master Commit: ff96d5b22a023119f72d91c598509302a9f77f3a Parents: 753100b Author: Florian Gessner <flo.gess...@gmail.com> Authored: Thu Aug 11 20:54:48 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 12 08:49:22 2016 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQEndpoint.java | 24 +++++++++-- .../rabbitmq/RabbitMQMessagePublisher.java | 33 ++++++++++++--- .../rabbitmq/RabbitMQProducerIntTest.java | 42 +++++++++++++++++++- 3 files changed, 89 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index bf40766..cfd9a06 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -150,6 +150,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { private boolean publisherAcknowledgements; @UriParam(label = "producer") private long publisherAcknowledgementsTimeout; + @UriParam(label = "producer") + private boolean guaranteedDeliveries; // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq private boolean useMessageIDAsCorrelationID = true; // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq @@ -411,7 +413,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { /** * If true the queue will not be bound to the exchange after declaring it - * @return + * @return */ public boolean isSkipQueueBind() { return skipQueueBind; @@ -420,7 +422,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { public void setSkipQueueBind(boolean skipQueueBind) { this.skipQueueBind = skipQueueBind; } - + /** * This can be used if we need to declare the queue but not the exchange */ @@ -799,7 +801,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { } /** - * When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response + * When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response */ public void setTransferException(boolean transferException) { this.transferException = transferException; @@ -832,6 +834,22 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { } /** + * When true, an exception will be thrown when the message cannot be delivered (basic.return) and the message is + * marked as mandatory. + * PublisherAcknowledgement will also be activated in this case + * + * See also <a href=https://www.rabbitmq.com/confirms.html">publisher acknowledgements</a> - When will messages be + * confirmed? + */ + public boolean isGuaranteedDeliveries() { + return guaranteedDeliveries; + } + + public void setGuaranteedDeliveries(boolean guaranteedDeliveries) { + this.guaranteedDeliveries = guaranteedDeliveries; + } + + /** * Get replyToType for inOut exchange */ public String getReplyToType() { http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java index bc78665..15f69ff 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ReturnListener; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; @@ -37,6 +38,13 @@ import org.slf4j.LoggerFactory; * A method object for publishing to RabbitMQ */ public class RabbitMQMessagePublisher { + private static final ReturnListener GUARANTEED_DELIVERY_RETURN_LISTENER = new ReturnListener() { + @Override + public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { + throw new RuntimeCamelException("Delivery failed for exchange " + exchange + " and routing key " + routingKey + "; replyCode = " + replyCode + " replyText = " + replyText); + } + }; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessagePublisher.class); private final Exchange camelExchange; private final Channel channel; @@ -60,7 +68,7 @@ public class RabbitMQMessagePublisher { LOG.debug("Removing the {} header", RabbitMQEndpoint.SERIALIZE_HEADER); message.getHeaders().remove(RabbitMQEndpoint.SERIALIZE_HEADER); } - + return message; } @@ -86,7 +94,7 @@ public class RabbitMQMessagePublisher { throw new RuntimeCamelException(e); } } - + publishToRabbitMQ(properties, body); } @@ -98,17 +106,30 @@ public class RabbitMQMessagePublisher { LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId()); - if (endpoint.isPublisherAcknowledgements()) { + if (isPublisherAcknowledgements()) { channel.confirmSelect(); } + if (endpoint.isGuaranteedDeliveries()) { + channel.addReturnListener(GUARANTEED_DELIVERY_RETURN_LISTENER); - channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body); + } - if (endpoint.isPublisherAcknowledgements()) { - waitForConfirmation(); + try { + channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body); + if (isPublisherAcknowledgements()) { + waitForConfirmation(); + } + } finally { + if (endpoint.isGuaranteedDeliveries()) { + channel.removeReturnListener(GUARANTEED_DELIVERY_RETURN_LISTENER); + } } } + private boolean isPublisherAcknowledgements() { + return endpoint.isPublisherAcknowledgements() || endpoint.isGuaranteedDeliveries(); + } + private void waitForConfirmation() throws IOException { try { LOG.debug("Waiting for publisher acknowledgements for {}ms", endpoint.getPublisherAcknowledgementsTimeout()); http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java index fded387..5be1be1 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java @@ -29,6 +29,7 @@ import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.After; @@ -42,6 +43,9 @@ public class RabbitMQProducerIntTest extends CamelTestSupport { private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, ROUTE); private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + "&mandatory=true&publisherAcknowledgements=true"; private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&publisherAcknowledgements=true"; + private static final String GUARANTEED_DELIVERY_URI = BASIC_URI + "&mandatory=true&guaranteedDeliveries=true"; + private static final String GUARANTEED_DELIVERY_BAD_ROUTE_NOT_MANDATORY_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&guaranteedDeliveries=true"; + private static final String GUARANTEED_DELIVERY_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&mandatory=true&guaranteedDeliveries=true"; @Produce(uri = "direct:start") protected ProducerTemplate template; @@ -52,6 +56,15 @@ public class RabbitMQProducerIntTest extends CamelTestSupport { @Produce(uri = "direct:start-with-confirms-bad-route") protected ProducerTemplate templateWithConfirmsAndBadRoute; + @Produce(uri = "direct:start-with-guaranteed-delivery") + protected ProducerTemplate templateWithGuranteedDelivery; + + @Produce(uri = "direct:start-with-guaranteed-delivery-bad-route") + protected ProducerTemplate templateWithGuranteedDeliveryAndBadRoute; + + @Produce(uri = "direct:start-with-guaranteed-delivery-bad-route-but-not-mandatory") + protected ProducerTemplate templateWithGuranteedDeliveryBadRouteButNotMandatory; + private Connection connection; private Channel channel; @@ -64,6 +77,9 @@ public class RabbitMQProducerIntTest extends CamelTestSupport { from("direct:start").to(BASIC_URI); from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI); from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI); + from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI); + from("direct:start-with-guaranteed-delivery-bad-route").to(GUARANTEED_DELIVERY_BAD_ROUTE_URI); + from("direct:start-with-guaranteed-delivery-bad-route-but-not-mandatory").to(GUARANTEED_DELIVERY_BAD_ROUTE_NOT_MANDATORY_URI); } }; } @@ -121,6 +137,31 @@ public class RabbitMQProducerIntTest extends CamelTestSupport { assertThatBodiesReceivedIn(received); } + @Test + public void shouldSuccessfullyProduceMessageWhenGuaranteedDeliveryIsActivatedAndMessageIsMarkedAsMandatory() throws InterruptedException, IOException, TimeoutException { + final List<String> received = new ArrayList<>(); + channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received)); + + templateWithGuranteedDelivery.sendBodyAndHeader("publisher ack message", RabbitMQConstants.EXCHANGE_NAME, "ex1"); + + assertThatBodiesReceivedIn(received, "publisher ack message"); + } + + @Test(expected = RuntimeCamelException.class) + public void shouldFailIfMessageIsMarkedAsMandatoryAndGuaranteedDeliveryIsActiveButNoQueueIsBound() { + templateWithGuranteedDeliveryAndBadRoute.sendBody("publish with ack and return message"); + } + + @Test + public void shouldSuccessfullyProduceMessageWhenGuaranteedDeliveryIsActivatedOnABadRouteButMessageIsNotMandatory() throws InterruptedException, IOException, TimeoutException { + final List<String> received = new ArrayList<>(); + channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received)); + + templateWithGuranteedDeliveryBadRouteButNotMandatory.sendBodyAndHeader("publisher ack message", RabbitMQConstants.EXCHANGE_NAME, "ex1"); + + assertThatBodiesReceivedIn(received); + } + private Connection createTestConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); @@ -148,4 +189,3 @@ public class RabbitMQProducerIntTest extends CamelTestSupport { } } } -