Fixed test
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37ce3d91 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37ce3d91 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37ce3d91 Branch: refs/heads/camel-2.16.x Commit: 37ce3d913e3670a7e871ce7447ec97768578a014 Parents: 543e4d3 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Nov 20 08:19:03 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Nov 20 08:19:28 2015 +0100 ---------------------------------------------------------------------- ...uestReplyTempQueueMultipleConsumersTest.java | 39 ++++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/37ce3d91/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java index decf610..a293de3 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -42,44 +41,52 @@ public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupp private final Map<String, AtomicInteger> msgsPerThread = new ConcurrentHashMap<String, AtomicInteger>(); private PooledConnectionFactory connectionFactory; - + private ExecutorService executorService; + @Test public void testMultipleConsumingThreads() throws Exception { - doSendMessages(1000, 5); - assertTrue("Expected multiple consuming threads, but only found: " + msgsPerThread.keySet().size(), + executorService = context.getExecutorServiceManager().newFixedThreadPool(this, "test", 5); + + doSendMessages(1000); + + assertTrue("Expected multiple consuming threads, but only found: " + msgsPerThread.keySet().size(), msgsPerThread.keySet().size() > 1); + + context.getExecutorServiceManager().shutdown(executorService); } @Test public void testTempQueueRefreshed() throws Exception { - doSendMessages(500, 5); - connectionFactory.clear(); - doSendMessages(100, 5); + executorService = context.getExecutorServiceManager().newFixedThreadPool(this, "test", 5); + + doSendMessages(100); connectionFactory.clear(); - doSendMessages(100, 5); + Thread.sleep(1000); + doSendMessages(100); connectionFactory.clear(); - doSendMessages(100, 5); + Thread.sleep(1000); + doSendMessages(100); + + context.getExecutorServiceManager().shutdown(executorService); } - private void doSendMessages(int files, int poolSize) throws Exception { + private void doSendMessages(int files) throws Exception { resetMocks(); MockEndpoint mockEndpoint = getMockEndpoint("mock:result"); mockEndpoint.expectedMessageCount(files); mockEndpoint.expectsNoDuplicates(body()); - ExecutorService executor = Executors.newFixedThreadPool(poolSize); for (int i = 0; i < files; i++) { final int index = i; - executor.submit(new Callable<Object>() { + executorService.submit(new Callable<Object>() { public Object call() throws Exception { - template.sendBody("seda:start", "Message " + index); + template.sendBody("direct:start", "Message " + index); return null; } }); } assertMockEndpointsSatisfied(20, TimeUnit.SECONDS); - executor.shutdownNow(); } protected CamelContext createCamelContext() throws Exception { @@ -96,7 +103,7 @@ public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupp return new RouteBuilder() { @Override public void configure() throws Exception { - from("seda:start").inOut("jms:queue:foo?replyToConcurrentConsumers=10&replyToMaxConcurrentConsumers=20&recoveryInterval=10").process(new Processor() { + from("direct:start").inOut("jms:queue:foo?replyToConcurrentConsumers=10&replyToMaxConcurrentConsumers=20&recoveryInterval=10").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String threadName = Thread.currentThread().getName(); @@ -111,7 +118,7 @@ public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupp } }).to("mock:result"); - from("jms:queue:foo?concurrentConsumers=20&recoveryInterval=10").setBody(simple("Reply >>> ${body}")); + from("jms:queue:foo?concurrentConsumers=10&recoveryInterval=10").setBody(simple("Reply >>> ${body}")); } }; }