This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 961ad0e56e9331e71c386415ec67676e586ea629 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Oct 2 13:36:23 2021 +0200 CAMEL-16938: camel-core - Using transacted could cause StackOverflowException in some complex use cases. Changed ordering of executing transacted tasks in ReactiveExecutor to run from queue, which allows the waiting callee thread to run the task (so it runs with the right thread) but this also collapses the strack depth. --- components/camel-reactive-executor-vertx/pom.xml | 2 +- .../reactive/vertx/VertXReactiveExecutor.java | 6 ++ .../TransactedRetryWhileStackSizeTest.java | 90 ++++++++++++++++++++++ .../org/apache/camel/spi/ReactiveExecutor.java | 9 +++ .../camel/impl/engine/DefaultReactiveExecutor.java | 8 ++ .../org/apache/camel/processor/LoopProcessor.java | 2 +- .../apache/camel/processor/MulticastProcessor.java | 2 +- .../java/org/apache/camel/processor/Pipeline.java | 2 +- .../errorhandler/RedeliveryErrorHandler.java | 2 +- .../issues/RetryWhileStackOverflowIssueTest.java | 84 ++++++++++++++++++++ 10 files changed, 202 insertions(+), 5 deletions(-) diff --git a/components/camel-reactive-executor-vertx/pom.xml b/components/camel-reactive-executor-vertx/pom.xml index 44a4aed..2190fd5 100644 --- a/components/camel-reactive-executor-vertx/pom.xml +++ b/components/camel-reactive-executor-vertx/pom.xml @@ -35,7 +35,7 @@ <firstVersion>3.0.0</firstVersion> <label>reactive</label> <title>Reactive Executor Vert.x</title> - <supportLevel>experimental</supportLevel> + <supportLevel>Experimental</supportLevel> </properties> <dependencies> diff --git a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java index 51c1c87..c778bca 100644 --- a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java +++ b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java @@ -107,6 +107,12 @@ public class VertXReactiveExecutor extends ServiceSupport implements CamelContex } @Override + public void scheduleQueue(Runnable runnable) { + // not supported so schedule sync + scheduleSync(runnable); + } + + @Override public boolean executeFromQueue() { // not supported so return false return false; diff --git a/components/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactedRetryWhileStackSizeTest.java b/components/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactedRetryWhileStackSizeTest.java new file mode 100644 index 0000000..c1384fd --- /dev/null +++ b/components/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactedRetryWhileStackSizeTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.interceptor; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TransactedRetryWhileStackSizeTest extends TransactionClientDataSourceSupport { + + private static final Logger LOG = LoggerFactory.getLogger(TransactedRetryWhileStackSizeTest.class); + private static final boolean PRINT_STACK_TRACE = false; + private static final int MAX_DEPTH = 100; + + @Test + public void testStackSize() throws Exception { + getMockEndpoint("mock:error").expectedMessageCount(1); + getMockEndpoint("mock:error").message(0).body().isInstanceOf(MyCoolDude.class); + + MyCoolDude dude = new MyCoolDude(); + template.sendBody("seda:start", dude); + + assertMockEndpointsSatisfied(); + + assertEquals(1000 + 1, dude.getCounter()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException().retryWhile(simple("${body.areWeCool} == 'no'")).redeliveryDelay(0) + .handled(true).to("mock:error"); + + from("seda:start") + .transacted() + .recipientList(constant("foo:unknown")); + } + }; + } + + public static int currentStackSize() { + int depth = Thread.currentThread().getStackTrace().length; + if (PRINT_STACK_TRACE) { + new Throwable("Printing Stacktrace depth: " + depth).printStackTrace(System.err); + } + return depth; + } + + public static class MyCoolDude { + + private int counter; + + public String areWeCool() { + int size = currentStackSize(); + if (size > MAX_DEPTH) { + LOG.error("Stacktrace max depth: {}", size); + return "no"; + } + if (counter++ < 1000) { + return "no"; + } else { + return "yes"; + } + } + + public int getCounter() { + return counter; + } + } + +} diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java index e78fb3d..aabffe3 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java @@ -48,6 +48,15 @@ public interface ReactiveExecutor { void scheduleSync(Runnable runnable); /** + * Schedules the task to be run later from the queue (current thread) + * + * This is used for routing {@link org.apache.camel.Exchange} using transactions. + * + * @param runnable the task + */ + void scheduleQueue(Runnable runnable); + + /** * Executes the next task (if supported by the reactive executor implementation) * * @return true if a task was executed or false if no more pending tasks diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index 343254d..213bacf 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -70,6 +70,14 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE } @Override + public void scheduleQueue(Runnable runnable) { + if (LOG.isTraceEnabled()) { + LOG.trace("ScheduleQueue: {}", runnable); + } + workers.get().queue.add(runnable); + } + + @Override public boolean executeFromQueue() { return workers.get().executeFromQueue(); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java index 435ed9d..fe91562 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -75,7 +75,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, LoopState state = new LoopState(exchange, callback); if (exchange.isTransacted()) { - reactiveExecutor.scheduleSync(state); + reactiveExecutor.scheduleQueue(state); } else { reactiveExecutor.scheduleMain(state); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index d125723..4c9dcce 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -330,7 +330,7 @@ public class MulticastProcessor extends AsyncProcessorSupport executorService.submit(() -> reactiveExecutor.schedule(state)); } else { if (exchange.isTransacted()) { - reactiveExecutor.scheduleSync(state); + reactiveExecutor.scheduleQueue(state); } else { reactiveExecutor.scheduleMain(state); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java index 06e69f6..29d7596 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java @@ -179,7 +179,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo PooledExchangeTask task = taskFactory.acquire(exchange, callback); if (exchange.isTransacted()) { - reactiveExecutor.scheduleSync(task); + reactiveExecutor.scheduleQueue(task); } else { reactiveExecutor.scheduleMain(task); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 65d03d5..385fac4 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -211,7 +211,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // Run it if (exchange.isTransacted()) { - reactiveExecutor.scheduleSync(task); + reactiveExecutor.scheduleQueue(task); } else { reactiveExecutor.scheduleMain(task); } diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/RetryWhileStackOverflowIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/RetryWhileStackOverflowIssueTest.java new file mode 100644 index 0000000..b2f6d59 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/issues/RetryWhileStackOverflowIssueTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RetryWhileStackOverflowIssueTest extends ContextTestSupport { + + private static final boolean PRINT_STACK_TRACE = false; + + @Test + public void testRetry() throws Exception { + getMockEndpoint("mock:error").expectedMessageCount(1); + getMockEndpoint("mock:error").message(0).body().isInstanceOf(MyCoolDude.class); + + MyCoolDude dude = new MyCoolDude(); + template.sendBody("direct:start", dude); + + assertMockEndpointsSatisfied(); + + assertEquals(1000 + 1, dude.getCounter()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(IllegalArgumentException.class).retryWhile(simple("${body.areWeCool} == 'no'")).redeliveryDelay(0) + .handled(true).to("mock:error"); + + from("direct:start") + // .transacted() + .throwException(new IllegalArgumentException("Forced")); + } + }; + } + + public static class MyCoolDude { + + private int counter; + + public String areWeCool() { + int size = currentStackSize(); + System.out.println("Stacksize: " + size); + if (counter++ < 1000) { + return "no"; + } else { + return "yes"; + } + } + + public int getCounter() { + return counter; + } + } + + private static int currentStackSize() { + int depth = Thread.currentThread().getStackTrace().length; + if (PRINT_STACK_TRACE) { + new Throwable("Printing Stacktrace depth: " + depth).printStackTrace(System.err); + } + return depth; + } + +}