Repository: camel Updated Branches: refs/heads/camel-2.18.x 82dec08fc -> 01658bf2a refs/heads/master 77678ceec -> 222817838
Add test to check if requested direct reply messages are received Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/22281783 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/22281783 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/22281783 Branch: refs/heads/master Commit: 222817838c02616a35dd759470d550f7f2edc33c Parents: 63b0dbd Author: Roman Kalashnikov <roman_kalashni...@epam.com> Authored: Fri Mar 17 14:33:20 2017 +0300 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Mar 22 21:31:28 2017 +0100 ---------------------------------------------------------------------- .../RabbitMQConsumerIntTestReplyTo.java | 115 +++++++++++++++++++ 1 file changed, 115 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/22281783/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTestReplyTo.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTestReplyTo.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTestReplyTo.java new file mode 100644 index 0000000..39e0f7b --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTestReplyTo.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Before; +import org.junit.Test; + +/** + * Integration test to check if requested direct reply messages are received + */ +public class RabbitMQConsumerIntTestReplyTo extends AbstractRabbitMQIntTest { + + private static final String EXCHANGE = "ex_reply"; + private static final String ROUTING_KEY = "testreply"; + private static final String REQUEST = "Knock! Knock!"; + private static final String REPLY = "Hello world"; + private static final String QUEUE = "amq.rabbitmq.reply-to"; + + @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?routingKey=" + ROUTING_KEY) + private Endpoint from; + + private Connection connection; + private Channel channel; + + @Before + public void setUpRabbitMQ() throws Exception { + connection = connection(); + channel = connection.createChannel(); +// channel.queueDeclare("sammyq", false, false, true, null); +// channel.queueBind("sammyq", EXCHANGE, ROUTE); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + context().setTracing(true); + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + log.info("Building routes..."); + + from(from) + .log(body().toString()) + .setBody(simple(REPLY)); + } + }; + } + + @Test + public void replyMessageIsReceived() throws IOException, TimeoutException, InterruptedException { + final List<String> received = new ArrayList<>(); + + AMQP.BasicProperties.Builder prop = new AMQP.BasicProperties.Builder(); + prop.replyTo(QUEUE); + + channel.basicConsume(QUEUE, true, new ArrayPopulatingConsumer(received)); + channel.basicPublish(EXCHANGE, ROUTING_KEY, prop.build(), REQUEST.getBytes()); + + assertThatBodiesReceivedIn(received, REPLY); + } + + private void assertThatBodiesReceivedIn(final List<String> received, final String... expected) throws InterruptedException { + Thread.sleep(500); + + assertListSize(received, expected.length); + for (String body : expected) { + assertEquals(body, received.get(0)); + } + } + + private class ArrayPopulatingConsumer extends DefaultConsumer { + private final List<String> received; + + ArrayPopulatingConsumer(final List<String> received) { + super(RabbitMQConsumerIntTestReplyTo.this.channel); + this.received = received; + } + + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) throws IOException { + received.add(new String(body)); + } + } + +} +