This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.4.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.4.x by this push: new 1679e96 CAMEL-15580: SJMS Batch Consumer startup race condition (#4306) 1679e96 is described below commit 1679e965ee004a0031997bbbb725b9ce4f5036c3 Author: Viral Gohel <vgo...@redhat.com> AuthorDate: Sun Sep 27 13:28:12 2020 +0530 CAMEL-15580: SJMS Batch Consumer startup race condition (#4306) --- .../component/sjms/batch/SjmsBatchConsumer.java | 2 +- .../sjms/batch/SjmsBatchConsumerTest.java | 43 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index 6452fc6..daed6d9 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -185,6 +185,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { // its success so prepare for exit connection = localConnection; + running.set(true); final List<AtomicBoolean> triggers = new ArrayList<>(); for (int i = 0; i < consumerCount; i++) { @@ -204,7 +205,6 @@ public class SjmsBatchConsumer extends DefaultConsumer { } LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize); - running.set(true); return; } catch (Throwable e) { // we failed so close the local connection as we create a new on next attempt diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java index a2df865..4037369 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.sjms.batch; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -440,6 +441,48 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { assertEquals("E", body.get(4)); } + @Test + public void testStartupRaceCondition() throws Exception { + final int routeCount = 10; + final int consumerCount = 1; + + List<String> queues = new ArrayList<>(); + + String queueNamePrefix = getQueueName(); + + // setup routeCount routes, each reading from its own queue but all writing to the same mock endpoint + for (int i = 0; i < routeCount; i++) { + String queueName = queueNamePrefix + "_" + i; + queues.add(queueName); + String routeId = "batchConsumer_" + i; + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + + int completionTimeout = 1000; + int completionSize = 1; + + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy&keepAliveDelay=100&asyncStartListener=true", + queueName, completionTimeout, completionSize, consumerCount) + .routeId(routeId).autoStartup(true) + .split(body()) + .to("mock:split"); + } + }); + } + + context.start(); + + // expect to receive routeCount messages to the mock endpoint + MockEndpoint mockSplit = getMockEndpoint("mock:split"); + mockSplit.setExpectedMessageCount(routeCount); + + // send one message to all the queues + queues.forEach(queueName -> template.sendBody("sjms:queue:" + queueName, queueName)); + + assertMockEndpointsSatisfied(); + + } + private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) { Exchange exchange = mockEndpoint.getExchanges().get(0); assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());