Add unit test to support scenario concurrentConsumers
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/eee057cb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/eee057cb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/eee057cb Branch: refs/heads/camel-2.12.x Commit: eee057cb9f967bca5322244664ba0416c7682d6a Parents: 0f11a6f Author: Charles Moulliard <ch0...@gmail.com> Authored: Mon Mar 17 11:21:08 2014 +0100 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Mon Mar 31 14:20:53 2014 +0800 ---------------------------------------------------------------------- .../component/sjms/CamelJmsTestHelper.java | 83 ++++++++++++++++++++ .../consumer/InOutConcurrentConsumerTest.java | 80 +++++++++++++++++++ 2 files changed, 163 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/eee057cb/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java new file mode 100644 index 0000000..00e3555 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java @@ -0,0 +1,83 @@ +package org.apache.camel.component.sjms; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.camel.util.FileUtil; + +import javax.jms.ConnectionFactory; +import java.io.File; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A helper for unit testing with Apache ActiveMQ as embedded JMS broker. + * + * @version + */ +public final class CamelJmsTestHelper { + + private static AtomicInteger counter = new AtomicInteger(0); + + private CamelJmsTestHelper() { + } + + public static ConnectionFactory createConnectionFactory() { + return createConnectionFactory(null); + } + + public static ConnectionFactory createConnectionFactory(String options) { + // using a unique broker name improves testing when running the entire test suite in the same JVM + int id = counter.incrementAndGet(); + String url = "vm://test-broker-" + id + "?broker.persistent=false&broker.useJmx=false"; + if (options != null) { + url = url + "&" + options; + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + // optimize AMQ to be as fast as possible so unit testing is quicker + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setOptimizeAcknowledge(true); + connectionFactory.setOptimizedMessageDispatch(true); + + // When using asyncSend, producers will not be guaranteed to send in the order we + // have in the tests (which may be confusing for queues) so we need this set to false. + // Another way of guaranteeing order is to use persistent messages or transactions. + connectionFactory.setUseAsyncSend(false); + + connectionFactory.setAlwaysSessionAsync(false); + // use a pooled connection factory + PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory); + pooled.setMaxConnections(8); + return pooled; + } + + public static ConnectionFactory createPersistentConnectionFactory() { + return createPersistentConnectionFactory(null); + } + + public static ConnectionFactory createPersistentConnectionFactory(String options) { + // using a unique broker name improves testing when running the entire test suite in the same JVM + int id = counter.incrementAndGet(); + + // use an unique data directory in target + String dir = "target/activemq-data-" + id; + + // remove dir so its empty on startup + FileUtil.removeDir(new File(dir)); + + String url = "vm://test-broker-" + id + "?broker.persistent=true&broker.useJmx=false&broker.dataDirectory=" + dir; + if (options != null) { + url = url + "&" + options; + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + // optimize AMQ to be as fast as possible so unit testing is quicker + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setOptimizeAcknowledge(true); + connectionFactory.setOptimizedMessageDispatch(true); + connectionFactory.setUseAsyncSend(true); + connectionFactory.setAlwaysSessionAsync(false); + + // use a pooled connection factory + PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory); + pooled.setMaxConnections(8); + return pooled; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/eee057cb/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java new file mode 100644 index 0000000..fc18efc --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java @@ -0,0 +1,80 @@ +package org.apache.camel.component.sjms.consumer; + +import org.apache.camel.*; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * Concurrent consumer with JMSReply test. + */ +public class InOutConcurrentConsumerTest extends JmsTestSupport { + + @EndpointInject(uri = "mock:result") + MockEndpoint result; + + @Test + public void testConcurrent() throws Exception { + doSendMessages(10, 5); + } + + private void doSendMessages(int messages, int poolSize) throws Exception { + + result.expectedMessageCount(messages); + result.expectsNoDuplicates(body()); + + ExecutorService executor = Executors.newFixedThreadPool(poolSize); + final List<Future<String>> futures = new ArrayList<Future<String>>(); + for (int i = 0; i < messages; i++) { + final int index = i; + Future<String> out = executor.submit(new Callable<String>() { + public String call() throws Exception { + return template.requestBody("direct:start", "Message " + index, String.class); + } + }); + futures.add(out); + } + + assertMockEndpointsSatisfied(); + + for (int i = 0; i < futures.size(); i++) { + Object out = futures.get(i).get(); + assertEquals("Bye Message " + i, out); + } + executor.shutdownNow(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + return camelContext; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:start") + .to("sjms:a?consumerCount=5&exchangePattern=InOut&namedReplyTo=myResponse") + .to("mock:result"); + + from("sjms:a?consumerCount=5&exchangePattern=InOut&namedReplyTo=myResponse") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + // sleep a little to simulate heavy work and force concurrency processing + Thread.sleep(1000); + exchange.getOut().setBody("Bye " + body); + exchange.getOut().setHeader("threadName", Thread.currentThread().getName()); + System.out.println("Thread ID : " + Thread.currentThread().getName()); + } + }); + } + }; + } + +} +