This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0b185d12d9436acda7f2160594cd27e2c10cb3b1
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Jan 31 12:34:24 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in 
transacted mode will cause stackframe sizes to grown deep and lead to stack 
overflow error.
---
 .../interceptor/TransactedStackSizeTest.java       |  31 +++-
 .../apache/camel/processor/MulticastProcessor.java | 206 ++++++++++++++++-----
 .../ROOT/pages/camel-3x-upgrade-guide-3_8.adoc     |   9 +
 3 files changed, 194 insertions(+), 52 deletions(-)

diff --git 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
index 69e0206..9aa6b67 100644
--- 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
@@ -17,31 +17,46 @@
 package org.apache.camel.spring.interceptor;
 
 import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class TransactedStackSizeTest extends 
TransactionClientDataSourceSupport {
 
-    private static final boolean PRINT_STACK_TRACE = true;
+    private int total = 100;
+    private static final boolean PRINT_STACK_TRACE = false;
 
     @Test
     public void testStackSize() throws Exception {
-        getMockEndpoint("mock:line").expectedMessageCount(10);
+        getMockEndpoint("mock:line").expectedMessageCount(total);
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
-        template.sendBody("seda:start", "A,B,C,D,E,F,G,H,I,J");
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < total; i++) {
+            sb.append(i);
+            sb.append(",");
+        }
+        template.sendBody("seda:start", "" + sb.toString());
 
         assertMockEndpointsSatisfied();
 
-        int[] sizes = new int[11];
-        for (int i = 0; i < 10; i++) {
+        int[] sizes = new int[total + 1];
+        for (int i = 0; i < total; i++) {
             int size = 
getMockEndpoint("mock:line").getReceivedExchanges().get(i).getMessage().getHeader("stackSize",
                     int.class);
             sizes[i] = size;
+            Assertions.assertTrue(size < 100, "Stackframe should be < 100");
             log.info("#{} size {}", i, size);
         }
         int size = 
getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize",
 int.class);
-        sizes[10] = size;
-        log.info("#{} size {}", 10, size);
+        sizes[total] = size;
+        log.info("#{} size {}", total, size);
+
+        int prev = sizes[0];
+        // last may be shorter, so use total - 1
+        for (int i = 1; i < total - 1; i++) {
+            size = sizes[i];
+            Assertions.assertEquals(prev, size, "Stackframe should be same 
size");
+        }
     }
 
     @Override
@@ -53,9 +68,11 @@ public class TransactedStackSizeTest extends 
TransactionClientDataSourceSupport
                     .transacted()
                     .split(body())
                         .setHeader("stackSize", 
TransactedStackSizeTest::currentStackSize)
+                        .log("${body} stack-size ${header.stackSize}")
                         .to("mock:line")
                     .end()
                     .setHeader("stackSize", 
TransactedStackSizeTest::currentStackSize)
+                    .log("${body} stack-size ${header.stackSize}")
                     .to("mock:result");
             }
         };
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index c8c4559..b87a21d 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -282,7 +282,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             return true;
         }
 
-        MulticastTask state = new MulticastTask(exchange, pairs, callback);
+        // we need to run in either transacted or reactive mode because the 
threading model is different
+        // when we run in transacted mode, then we synchronous processing on 
the current thread
+        // this can lead to a long execution which can lead to deep 
stackframes, and therefore we
+        // must handle this specially in a while loop structure to ensure the 
strackframe does not grow deeper
+        // the reactive mode will execute each sub task in its own runnable 
task which is scheduled on the reactive executor
+        // which is how the routing engine normally operates
+        AbstractMulticastTask state = exchange.isTransacted()
+                ? new MulticastTransactedTask(exchange, pairs, callback) : new 
MulticastTask(exchange, pairs, callback);
         if (isParallelProcessing()) {
             executorService.submit(() -> reactiveExecutor.schedule(state));
         } else {
@@ -307,7 +314,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         }
     }
 
-    protected class MulticastTask implements Runnable {
+    protected abstract class AbstractMulticastTask implements Runnable {
 
         final Exchange original;
         final Iterable<ProcessorExchangePair> pairs;
@@ -321,7 +328,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         final AtomicBoolean allSent = new AtomicBoolean();
         final AtomicBoolean done = new AtomicBoolean();
 
-        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> 
pairs, AsyncCallback callback) {
+        AbstractMulticastTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
@@ -339,6 +346,71 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             return "MulticastTask";
         }
 
+        protected void aggregate() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    Exchange exchange;
+                    while (!done.get() && (exchange = completion.poll()) != 
null) {
+                        doAggregate(result, exchange, original);
+                        if (nbAggregated.incrementAndGet() >= 
nbExchangeSent.get() && allSent.get()) {
+                            doDone(result.get(), true);
+                        }
+                    }
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        protected void timeout() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    while (nbAggregated.get() < nbExchangeSent.get()) {
+                        Exchange exchange = completion.pollUnordered();
+                        int index = exchange != null ? 
getExchangeIndex(exchange) : nbExchangeSent.get();
+                        while (nbAggregated.get() < index) {
+                            AggregationStrategy strategy = 
getAggregationStrategy(null);
+                            strategy.timeout(result.get() != null ? 
result.get() : original,
+                                    nbAggregated.getAndIncrement(), 
nbExchangeSent.get(), timeout);
+                        }
+                        if (exchange != null) {
+                            doAggregate(result, exchange, original);
+                            nbAggregated.incrementAndGet();
+                        }
+                    }
+                    doDone(result.get(), true);
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        protected void doDone(Exchange exchange, boolean forceExhaust) {
+            if (done.compareAndSet(false, true)) {
+                MulticastProcessor.this.doDone(original, exchange, pairs, 
callback, false, forceExhaust);
+            }
+        }
+    }
+
+    /**
+     * Sub taks processed reactive via the {@link ReactiveExecutor}.
+     */
+    protected class MulticastTask extends AbstractMulticastTask {
+
+        public MulticastTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+            super(original, pairs, callback);
+        }
+
         @Override
         public void run() {
             try {
@@ -421,59 +493,103 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             }
         }
 
-        protected void aggregate() {
-            Lock lock = this.lock;
-            if (lock.tryLock()) {
+    }
+
+    /**
+     * Transacted sub task processed synchronously using {@link 
Processor#process(Exchange)} with the same thread in a
+     * while loop control flow.
+     */
+    protected class MulticastTransactedTask extends AbstractMulticastTask {
+
+        public MulticastTransactedTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+            super(original, pairs, callback);
+        }
+
+        @Override
+        public void run() {
+            boolean next = true;
+            while (next) {
                 try {
-                    Exchange exchange;
-                    while (!done.get() && (exchange = completion.poll()) != 
null) {
-                        doAggregate(result, exchange, original);
-                        if (nbAggregated.incrementAndGet() >= 
nbExchangeSent.get() && allSent.get()) {
-                            doDone(result.get(), true);
-                        }
-                    }
-                } catch (Throwable e) {
+                    next = doRun();
+                } catch (Exception e) {
                     original.setException(e);
-                    // and do the done work
                     doDone(null, false);
-                } finally {
-                    lock.unlock();
+                    return;
                 }
             }
         }
 
-        protected void timeout() {
-            Lock lock = this.lock;
-            if (lock.tryLock()) {
-                try {
-                    while (nbAggregated.get() < nbExchangeSent.get()) {
-                        Exchange exchange = completion.pollUnordered();
-                        int index = exchange != null ? 
getExchangeIndex(exchange) : nbExchangeSent.get();
-                        while (nbAggregated.get() < index) {
-                            AggregationStrategy strategy = 
getAggregationStrategy(null);
-                            strategy.timeout(result.get() != null ? 
result.get() : original,
-                                    nbAggregated.getAndIncrement(), 
nbExchangeSent.get(), timeout);
-                        }
-                        if (exchange != null) {
-                            doAggregate(result, exchange, original);
-                            nbAggregated.incrementAndGet();
-                        }
-                    }
-                    doDone(result.get(), true);
-                } catch (Throwable e) {
-                    original.setException(e);
-                    // and do the done work
-                    doDone(null, false);
-                } finally {
-                    lock.unlock();
+        boolean doRun() throws Exception {
+            if (done.get()) {
+                return false;
+            }
+
+            // Check if the iterator is empty
+            // This can happen the very first time we check the existence
+            // of an item before queuing the run.
+            // or some iterators may return true for hasNext() but then null 
in next()
+            if (!iterator.hasNext()) {
+                doDone(result.get(), true);
+                return false;
+            }
+
+            ProcessorExchangePair pair = iterator.next();
+            boolean hasNext = iterator.hasNext();
+            // some iterators may return true for hasNext() but then null in 
next()
+            if (pair == null && !hasNext) {
+                doDone(result.get(), true);
+                return false;
+            }
+
+            Exchange exchange = pair.getExchange();
+            int index = nbExchangeSent.getAndIncrement();
+            updateNewExchange(exchange, index, pairs, hasNext);
+
+            // Schedule the processing of the next pair
+            if (hasNext) {
+                if (isParallelProcessing()) {
+                    schedule(this);
                 }
+            } else {
+                allSent.set(true);
             }
-        }
 
-        protected void doDone(Exchange exchange, boolean forceExhaust) {
-            if (done.compareAndSet(false, true)) {
-                MulticastProcessor.this.doDone(original, exchange, pairs, 
callback, false, forceExhaust);
+            // process next
+
+            // compute time taken if sending to another endpoint
+            StopWatch watch = beforeSend(pair);
+            Processor sync = pair.getProcessor();
+            try {
+                sync.process(exchange);
+            } finally {
+                afterSend(pair, watch);
+            }
+
+            // Decide whether to continue with the multicast or not; similar 
logic to the Pipeline
+            // remember to test for stop on exception and aggregate before 
copying back results
+            boolean continueProcessing = 
PipelineHelper.continueProcessing(exchange,
+                    "Multicast processing failed for number " + index, LOG);
+            if (stopOnException && !continueProcessing) {
+                if (exchange.getException() != null) {
+                    // wrap in exception to explain where it failed
+                    exchange.setException(new CamelExchangeException(
+                            "Multicast processing failed for number " + index, 
exchange, exchange.getException()));
+                } else {
+                    // we want to stop on exception, and the exception was 
handled by the error handler
+                    // this is similar to what the pipeline does, so we should 
do the same to not surprise end users
+                    // so we should set the failed exchange as the result and 
be done
+                    result.set(exchange);
+                }
+                // and do the done work
+                doDone(exchange, true);
+                return false;
             }
+
+            // aggregate exchanges if any
+            aggregate();
+
+            // next step
+            return true;
         }
     }
 
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
index 11c67f6..585bcf6 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
@@ -33,6 +33,15 @@ The method `isOnlyDynamicQueryParameters` is removed from 
`org.apache.camel.spi.
 
 The `onCompletion` EIP has fixed it could trigger multiple completions for a 
given `Exchange`
 
+=== Transactions and Multicast, Splitter, or Recipient List EIPs
+
+When using `transacted` in Camel routes with Multicast, Splitter, or Recipient 
List EIPs, then the exection strackframe
+may grown deep and could cause Stack overflow exception. This has been fixed 
by refactoring the EIP into a special
+transacted mode and the existing reactive mode.
+
+We do not anticipate any issues but if you are using transactions and these 
EIPs then we would like to have feedback
+if you encounter any problems with upgrading.
+
 === camel-jackson
 
 When using XML DSL then `jsonView` has been renamed to `jsonViewTypeName` and 
made general available in the model,

Reply via email to