Repository: camel Updated Branches: refs/heads/master b7f4e7ac2 -> bcd00fa15
CAMEL-9399 Implementation and happy path integration test Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e3751db8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e3751db8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e3751db8 Branch: refs/heads/master Commit: e3751db814bc418fe265159992d7c5df678ebbd2 Parents: b7f4e7a Author: MiloÅ¡ MilivojeviÄ <mmilivoje...@deployinc.com> Authored: Mon Dec 21 10:38:29 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Dec 21 15:34:24 2015 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQEndpoint.java | 51 ++++++++++- .../rabbitmq/RabbitMQProducerIntTest.java | 96 ++++++++++++++++---- 2 files changed, 125 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e3751db8/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index bcf0e7f..f7a02f4 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.rabbitmq; +import javax.net.ssl.TrustManager; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -34,8 +35,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; -import javax.net.ssl.TrustManager; - import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; @@ -43,7 +42,6 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.LongString; - import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -159,6 +157,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { private long requestTimeoutCheckerInterval = 1000; @UriParam private boolean transferException; + @UriParam(label = "producer") + private boolean publisherAcknowledgements; + @UriParam(label = "producer") + private long publisherAcknowledgementsTimeout; // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq private boolean useMessageIDAsCorrelationID = true; // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq @@ -166,7 +168,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint { // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq private String replyTo; - private RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter(); + private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter(); public RabbitMQEndpoint() { @@ -305,7 +307,16 @@ public class RabbitMQEndpoint extends DefaultEndpoint { Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), Boolean.class); LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId()); + + if (isPublisherAcknowledgements()) { + channel.confirmSelect(); + } + channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body); + + if (isPublisherAcknowledgements()) { + waitForConfirmationFor(channel, camelExchange); + } } /** @@ -318,6 +329,16 @@ public class RabbitMQEndpoint extends DefaultEndpoint { exchangeName = getExchangeName(); } return exchangeName; + } + + private void waitForConfirmationFor(final Channel channel, final Exchange camelExchange) throws IOException { + try { + LOG.debug("Waiting for publisher acknowledgements for {}ms", getPublisherAcknowledgementsTimeout()); + channel.waitForConfirmsOrDie(getPublisherAcknowledgementsTimeout()); + } catch (InterruptedException | TimeoutException e) { + LOG.warn("Acknowledgement error for {}", camelExchange); + throw new RuntimeCamelException(e); + } } @Override @@ -978,6 +999,28 @@ public class RabbitMQEndpoint extends DefaultEndpoint { } /** + * When true, the message will be published with <a href="https://www.rabbitmq.com/confirms.html">publisher acknowledgements</a> turned on + */ + public boolean isPublisherAcknowledgements() { + return publisherAcknowledgements; + } + + public void setPublisherAcknowledgements(final boolean publisherAcknowledgements) { + this.publisherAcknowledgements = publisherAcknowledgements; + } + + /** + * The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server + */ + public long getPublisherAcknowledgementsTimeout() { + return publisherAcknowledgementsTimeout; + } + + public void setPublisherAcknowledgementsTimeout(final long publisherAcknowledgementsTimeout) { + this.publisherAcknowledgementsTimeout = publisherAcknowledgementsTimeout; + } + + /** * Get replyToType for inOut exchange */ public String getReplyToType() { http://git-wip-us.apache.org/repos/asf/camel/blob/e3751db8/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java index 377f8a9..2684c20 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java @@ -27,67 +27,127 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; - -import org.apache.camel.Endpoint; -import org.apache.camel.EndpointInject; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public class RabbitMQProducerIntTest extends CamelTestSupport { private static final String EXCHANGE = "ex1"; + private static final String ROUTE = "route1"; + private static final String BASIC_URI_FORMAT = "rabbitmq:localhost:5672/%s?routingKey=%s&username=cameltest&password=cameltest&skipQueueDeclare=true"; + private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, ROUTE); + private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + "&mandatory=true&publisherAcknowledgements=true"; + private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&publisherAcknowledgements=true"; @Produce(uri = "direct:start") protected ProducerTemplate template; + @Produce(uri = "direct:start-with-confirms") + protected ProducerTemplate templateWithConfirms; - @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?routingKey=route1&username=cameltest&password=cameltest") - private Endpoint to; + @Produce(uri = "direct:start-with-confirms-bad-route") + protected ProducerTemplate templateWithConfirmsAndBadRoute; + private Connection connection; + private Channel channel; @Override protected RouteBuilder createRouteBuilder() throws Exception { + context().setTracing(true); return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to(to); + from("direct:start").to(BASIC_URI); + from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI); + from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI); } }; } + @Before + public void setUpRabbitMQ() throws Exception { + connection = createTestConnection(); + channel = connection.createChannel(); + channel.queueDeclare("sammyq", false, false, true, null); + channel.queueBind("sammyq", EXCHANGE, ROUTE); + } + + @After + public void tearDownRabbitMQ() throws Exception { + channel.abort(); + connection.abort(); + } + @Test public void producedMessageIsReceived() throws InterruptedException, IOException, TimeoutException { + final List<String> received = new ArrayList<>(); + channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received)); + + template.sendBodyAndHeader("new message", RabbitMQConstants.EXCHANGE_NAME, "ex1"); + + assertThatBodiesReceivedIn(received, "new message"); + } + private void assertThatBodiesReceivedIn(final List<String> received, final String... expected) throws InterruptedException { + Thread.sleep(500); + + assertListSize(received, expected.length); + for (String body : expected) { + assertEquals(body, received.get(0)); + } + } + + @Test + public void producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabled() throws InterruptedException, IOException, TimeoutException { + final List<String> received = new ArrayList<>(); + channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received)); + + templateWithConfirms.sendBodyAndHeader("publisher ack message", RabbitMQConstants.EXCHANGE_NAME, "ex1"); + + assertThatBodiesReceivedIn(received, "publisher ack message"); + } + + @Test + public void producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabledAndBadRoutingKeyIsUsed() throws InterruptedException, IOException, TimeoutException { + final List<String> received = new ArrayList<>(); + channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received)); + + templateWithConfirmsAndBadRoute.sendBody("publisher ack message"); + + assertThatBodiesReceivedIn(received); + } + + private Connection createTestConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("cameltest"); factory.setPassword("cameltest"); factory.setVirtualHost("/"); - Connection conn = factory.newConnection(); + return factory.newConnection(); + } - final List<Envelope> received = new ArrayList<Envelope>(); + private class ArrayPopulatingConsumer extends DefaultConsumer { + private final List<String> received; + + public ArrayPopulatingConsumer(final List<String> received) { + super(RabbitMQProducerIntTest.this.channel); + this.received = received; + } - Channel channel = conn.createChannel(); - channel.queueDeclare("sammyq", false, false, true, null); - channel.queueBind("sammyq", EXCHANGE, "route1"); - channel.basicConsume("sammyq", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - received.add(envelope); + received.add(new String(body)); } - }); - - template.sendBodyAndHeader("new message", RabbitMQConstants.EXCHANGE_NAME, "ex1"); - Thread.sleep(500); - assertEquals(1, received.size()); } }