This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0b185d12d9436acda7f2160594cd27e2c10cb3b1 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Jan 31 12:34:24 2021 +0100 CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error. --- .../interceptor/TransactedStackSizeTest.java | 31 +++- .../apache/camel/processor/MulticastProcessor.java | 206 ++++++++++++++++----- .../ROOT/pages/camel-3x-upgrade-guide-3_8.adoc | 9 + 3 files changed, 194 insertions(+), 52 deletions(-) diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java index 69e0206..9aa6b67 100644 --- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java @@ -17,31 +17,46 @@ package org.apache.camel.spring.interceptor; import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class TransactedStackSizeTest extends TransactionClientDataSourceSupport { - private static final boolean PRINT_STACK_TRACE = true; + private int total = 100; + private static final boolean PRINT_STACK_TRACE = false; @Test public void testStackSize() throws Exception { - getMockEndpoint("mock:line").expectedMessageCount(10); + getMockEndpoint("mock:line").expectedMessageCount(total); getMockEndpoint("mock:result").expectedMessageCount(1); - template.sendBody("seda:start", "A,B,C,D,E,F,G,H,I,J"); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < total; i++) { + sb.append(i); + sb.append(","); + } + template.sendBody("seda:start", "" + sb.toString()); assertMockEndpointsSatisfied(); - int[] sizes = new int[11]; - for (int i = 0; i < 10; i++) { + int[] sizes = new int[total + 1]; + for (int i = 0; i < total; i++) { int size = getMockEndpoint("mock:line").getReceivedExchanges().get(i).getMessage().getHeader("stackSize", int.class); sizes[i] = size; + Assertions.assertTrue(size < 100, "Stackframe should be < 100"); log.info("#{} size {}", i, size); } int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class); - sizes[10] = size; - log.info("#{} size {}", 10, size); + sizes[total] = size; + log.info("#{} size {}", total, size); + + int prev = sizes[0]; + // last may be shorter, so use total - 1 + for (int i = 1; i < total - 1; i++) { + size = sizes[i]; + Assertions.assertEquals(prev, size, "Stackframe should be same size"); + } } @Override @@ -53,9 +68,11 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport .transacted() .split(body()) .setHeader("stackSize", TransactedStackSizeTest::currentStackSize) + .log("${body} stack-size ${header.stackSize}") .to("mock:line") .end() .setHeader("stackSize", TransactedStackSizeTest::currentStackSize) + .log("${body} stack-size ${header.stackSize}") .to("mock:result"); } }; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index c8c4559..b87a21d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -282,7 +282,14 @@ public class MulticastProcessor extends AsyncProcessorSupport return true; } - MulticastTask state = new MulticastTask(exchange, pairs, callback); + // we need to run in either transacted or reactive mode because the threading model is different + // when we run in transacted mode, then we synchronous processing on the current thread + // this can lead to a long execution which can lead to deep stackframes, and therefore we + // must handle this specially in a while loop structure to ensure the strackframe does not grow deeper + // the reactive mode will execute each sub task in its own runnable task which is scheduled on the reactive executor + // which is how the routing engine normally operates + AbstractMulticastTask state = exchange.isTransacted() + ? new MulticastTransactedTask(exchange, pairs, callback) : new MulticastTask(exchange, pairs, callback); if (isParallelProcessing()) { executorService.submit(() -> reactiveExecutor.schedule(state)); } else { @@ -307,7 +314,7 @@ public class MulticastProcessor extends AsyncProcessorSupport } } - protected class MulticastTask implements Runnable { + protected abstract class AbstractMulticastTask implements Runnable { final Exchange original; final Iterable<ProcessorExchangePair> pairs; @@ -321,7 +328,7 @@ public class MulticastProcessor extends AsyncProcessorSupport final AtomicBoolean allSent = new AtomicBoolean(); final AtomicBoolean done = new AtomicBoolean(); - MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { + AbstractMulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { this.original = original; this.pairs = pairs; this.callback = callback; @@ -339,6 +346,71 @@ public class MulticastProcessor extends AsyncProcessorSupport return "MulticastTask"; } + protected void aggregate() { + Lock lock = this.lock; + if (lock.tryLock()) { + try { + Exchange exchange; + while (!done.get() && (exchange = completion.poll()) != null) { + doAggregate(result, exchange, original); + if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) { + doDone(result.get(), true); + } + } + } catch (Throwable e) { + original.setException(e); + // and do the done work + doDone(null, false); + } finally { + lock.unlock(); + } + } + } + + protected void timeout() { + Lock lock = this.lock; + if (lock.tryLock()) { + try { + while (nbAggregated.get() < nbExchangeSent.get()) { + Exchange exchange = completion.pollUnordered(); + int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get(); + while (nbAggregated.get() < index) { + AggregationStrategy strategy = getAggregationStrategy(null); + strategy.timeout(result.get() != null ? result.get() : original, + nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout); + } + if (exchange != null) { + doAggregate(result, exchange, original); + nbAggregated.incrementAndGet(); + } + } + doDone(result.get(), true); + } catch (Throwable e) { + original.setException(e); + // and do the done work + doDone(null, false); + } finally { + lock.unlock(); + } + } + } + + protected void doDone(Exchange exchange, boolean forceExhaust) { + if (done.compareAndSet(false, true)) { + MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust); + } + } + } + + /** + * Sub taks processed reactive via the {@link ReactiveExecutor}. + */ + protected class MulticastTask extends AbstractMulticastTask { + + public MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { + super(original, pairs, callback); + } + @Override public void run() { try { @@ -421,59 +493,103 @@ public class MulticastProcessor extends AsyncProcessorSupport } } - protected void aggregate() { - Lock lock = this.lock; - if (lock.tryLock()) { + } + + /** + * Transacted sub task processed synchronously using {@link Processor#process(Exchange)} with the same thread in a + * while loop control flow. + */ + protected class MulticastTransactedTask extends AbstractMulticastTask { + + public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { + super(original, pairs, callback); + } + + @Override + public void run() { + boolean next = true; + while (next) { try { - Exchange exchange; - while (!done.get() && (exchange = completion.poll()) != null) { - doAggregate(result, exchange, original); - if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) { - doDone(result.get(), true); - } - } - } catch (Throwable e) { + next = doRun(); + } catch (Exception e) { original.setException(e); - // and do the done work doDone(null, false); - } finally { - lock.unlock(); + return; } } } - protected void timeout() { - Lock lock = this.lock; - if (lock.tryLock()) { - try { - while (nbAggregated.get() < nbExchangeSent.get()) { - Exchange exchange = completion.pollUnordered(); - int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get(); - while (nbAggregated.get() < index) { - AggregationStrategy strategy = getAggregationStrategy(null); - strategy.timeout(result.get() != null ? result.get() : original, - nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout); - } - if (exchange != null) { - doAggregate(result, exchange, original); - nbAggregated.incrementAndGet(); - } - } - doDone(result.get(), true); - } catch (Throwable e) { - original.setException(e); - // and do the done work - doDone(null, false); - } finally { - lock.unlock(); + boolean doRun() throws Exception { + if (done.get()) { + return false; + } + + // Check if the iterator is empty + // This can happen the very first time we check the existence + // of an item before queuing the run. + // or some iterators may return true for hasNext() but then null in next() + if (!iterator.hasNext()) { + doDone(result.get(), true); + return false; + } + + ProcessorExchangePair pair = iterator.next(); + boolean hasNext = iterator.hasNext(); + // some iterators may return true for hasNext() but then null in next() + if (pair == null && !hasNext) { + doDone(result.get(), true); + return false; + } + + Exchange exchange = pair.getExchange(); + int index = nbExchangeSent.getAndIncrement(); + updateNewExchange(exchange, index, pairs, hasNext); + + // Schedule the processing of the next pair + if (hasNext) { + if (isParallelProcessing()) { + schedule(this); } + } else { + allSent.set(true); } - } - protected void doDone(Exchange exchange, boolean forceExhaust) { - if (done.compareAndSet(false, true)) { - MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust); + // process next + + // compute time taken if sending to another endpoint + StopWatch watch = beforeSend(pair); + Processor sync = pair.getProcessor(); + try { + sync.process(exchange); + } finally { + afterSend(pair, watch); + } + + // Decide whether to continue with the multicast or not; similar logic to the Pipeline + // remember to test for stop on exception and aggregate before copying back results + boolean continueProcessing = PipelineHelper.continueProcessing(exchange, + "Multicast processing failed for number " + index, LOG); + if (stopOnException && !continueProcessing) { + if (exchange.getException() != null) { + // wrap in exception to explain where it failed + exchange.setException(new CamelExchangeException( + "Multicast processing failed for number " + index, exchange, exchange.getException())); + } else { + // we want to stop on exception, and the exception was handled by the error handler + // this is similar to what the pipeline does, so we should do the same to not surprise end users + // so we should set the failed exchange as the result and be done + result.set(exchange); + } + // and do the done work + doDone(exchange, true); + return false; } + + // aggregate exchanges if any + aggregate(); + + // next step + return true; } } diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc index 11c67f6..585bcf6 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc @@ -33,6 +33,15 @@ The method `isOnlyDynamicQueryParameters` is removed from `org.apache.camel.spi. The `onCompletion` EIP has fixed it could trigger multiple completions for a given `Exchange` +=== Transactions and Multicast, Splitter, or Recipient List EIPs + +When using `transacted` in Camel routes with Multicast, Splitter, or Recipient List EIPs, then the exection strackframe +may grown deep and could cause Stack overflow exception. This has been fixed by refactoring the EIP into a special +transacted mode and the existing reactive mode. + +We do not anticipate any issues but if you are using transactions and these EIPs then we would like to have feedback +if you encounter any problems with upgrading. + === camel-jackson When using XML DSL then `jsonView` has been renamed to `jsonViewTypeName` and made general available in the model,