This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f24071a331c0b2a50e57d1f5ef52e76fa5e408a6
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Jan 31 16:54:58 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in 
transacted mode will cause stackframe sizes to grown deep and lead to stack 
overflow error.
---
 .../interceptor/TransactedStackSizeBreakOnExceptionTest.java   |  2 +-
 .../interceptor/TransactedStackSizeParallelProcessingTest.java | 10 ++++------
 .../camel/spring/interceptor/TransactedStackSizeTest.java      |  4 ++--
 .../java/org/apache/camel/processor/MulticastProcessor.java    |  3 ++-
 4 files changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
index 1eb29f2..1a6b5b6 100644
--- 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
+++ 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
@@ -49,7 +49,7 @@ public class TransactedStackSizeBreakOnExceptionTest extends 
TransactionClientDa
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
 
         int prev = sizes[0];
diff --git 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
index 739a9ed..729c78d 100644
--- 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
+++ 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
@@ -18,17 +18,15 @@ package org.apache.camel.spring.interceptor;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.Test;
 
 public class TransactedStackSizeParallelProcessingTest extends 
TransactionClientDataSourceSupport {
 
     private int total = 100;
     private static final boolean PRINT_STACK_TRACE = false;
 
-    @Disabled("Flaky - May report 101 or 102 messages")
-    @RepeatedTest(value = 100)
+    // to test for flaky when using parallel processing then set this to 100
+    @RepeatedTest(value = 1)
     public void testStackSize() throws Exception {
         getMockEndpoint("mock:line").expectedMessageCount(total);
         getMockEndpoint("mock:line").assertNoDuplicates(body());
@@ -49,11 +47,11 @@ public class TransactedStackSizeParallelProcessingTest 
extends TransactionClient
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
         int size = 
getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize",
 int.class);
         sizes[total] = size;
-        log.info("#{} size {}", total, size);
+        log.debug("#{} size {}", total, size);
 
         int prev = sizes[0];
         // last may be shorter, so use total - 1
diff --git 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
index 2f0bf71..984121b 100644
--- 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
@@ -46,11 +46,11 @@ public class TransactedStackSizeTest extends 
TransactionClientDataSourceSupport
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
         int size = 
getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize",
 int.class);
         sizes[total] = size;
-        log.info("#{} size {}", total, size);
+        log.debug("#{} size {}", total, size);
 
         int prev = sizes[0];
         // last may be shorter, so use total - 1
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index f005dc9..948f514 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -288,7 +288,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         // must handle this specially in a while loop structure to ensure the 
strackframe does not grow deeper
         // the reactive mode will execute each sub task in its own runnable 
task which is scheduled on the reactive executor
         // which is how the routing engine normally operates
-        MulticastTask state = exchange.isTransacted()
+        // if we have parallel processing enabled then we cannot run in 
transacted mode (requires synchronous processing via same thread)
+        MulticastTask state = !isParallelProcessing() && 
exchange.isTransacted()
                 ? new MulticastTransactedTask(exchange, pairs, callback) : new 
MulticastReactiveTask(exchange, pairs, callback);
         if (isParallelProcessing()) {
             executorService.submit(() -> reactiveExecutor.schedule(state));

Reply via email to