This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit f24071a331c0b2a50e57d1f5ef52e76fa5e408a6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Jan 31 16:54:58 2021 +0100 CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error. --- .../interceptor/TransactedStackSizeBreakOnExceptionTest.java | 2 +- .../interceptor/TransactedStackSizeParallelProcessingTest.java | 10 ++++------ .../camel/spring/interceptor/TransactedStackSizeTest.java | 4 ++-- .../java/org/apache/camel/processor/MulticastProcessor.java | 3 ++- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java index 1eb29f2..1a6b5b6 100644 --- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java @@ -49,7 +49,7 @@ public class TransactedStackSizeBreakOnExceptionTest extends TransactionClientDa int.class); sizes[i] = size; Assertions.assertTrue(size < 100, "Stackframe should be < 100"); - log.info("#{} size {}", i, size); + log.debug("#{} size {}", i, size); } int prev = sizes[0]; diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java index 739a9ed..729c78d 100644 --- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java @@ -18,17 +18,15 @@ package org.apache.camel.spring.interceptor; import org.apache.camel.builder.RouteBuilder; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Test; public class TransactedStackSizeParallelProcessingTest extends TransactionClientDataSourceSupport { private int total = 100; private static final boolean PRINT_STACK_TRACE = false; - @Disabled("Flaky - May report 101 or 102 messages") - @RepeatedTest(value = 100) + // to test for flaky when using parallel processing then set this to 100 + @RepeatedTest(value = 1) public void testStackSize() throws Exception { getMockEndpoint("mock:line").expectedMessageCount(total); getMockEndpoint("mock:line").assertNoDuplicates(body()); @@ -49,11 +47,11 @@ public class TransactedStackSizeParallelProcessingTest extends TransactionClient int.class); sizes[i] = size; Assertions.assertTrue(size < 100, "Stackframe should be < 100"); - log.info("#{} size {}", i, size); + log.debug("#{} size {}", i, size); } int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class); sizes[total] = size; - log.info("#{} size {}", total, size); + log.debug("#{} size {}", total, size); int prev = sizes[0]; // last may be shorter, so use total - 1 diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java index 2f0bf71..984121b 100644 --- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java @@ -46,11 +46,11 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport int.class); sizes[i] = size; Assertions.assertTrue(size < 100, "Stackframe should be < 100"); - log.info("#{} size {}", i, size); + log.debug("#{} size {}", i, size); } int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class); sizes[total] = size; - log.info("#{} size {}", total, size); + log.debug("#{} size {}", total, size); int prev = sizes[0]; // last may be shorter, so use total - 1 diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index f005dc9..948f514 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -288,7 +288,8 @@ public class MulticastProcessor extends AsyncProcessorSupport // must handle this specially in a while loop structure to ensure the strackframe does not grow deeper // the reactive mode will execute each sub task in its own runnable task which is scheduled on the reactive executor // which is how the routing engine normally operates - MulticastTask state = exchange.isTransacted() + // if we have parallel processing enabled then we cannot run in transacted mode (requires synchronous processing via same thread) + MulticastTask state = !isParallelProcessing() && exchange.isTransacted() ? new MulticastTransactedTask(exchange, pairs, callback) : new MulticastReactiveTask(exchange, pairs, callback); if (isParallelProcessing()) { executorService.submit(() -> reactiveExecutor.schedule(state));