This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 961ad0e56e9331e71c386415ec67676e586ea629
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sat Oct 2 13:36:23 2021 +0200

    CAMEL-16938: camel-core - Using transacted could cause 
StackOverflowException in some complex use cases. Changed ordering of executing 
transacted tasks in ReactiveExecutor to run from queue, which allows the 
waiting callee thread to run the task (so it runs with the right thread) but 
this also collapses the strack depth.
---
 components/camel-reactive-executor-vertx/pom.xml   |  2 +-
 .../reactive/vertx/VertXReactiveExecutor.java      |  6 ++
 .../TransactedRetryWhileStackSizeTest.java         | 90 ++++++++++++++++++++++
 .../org/apache/camel/spi/ReactiveExecutor.java     |  9 +++
 .../camel/impl/engine/DefaultReactiveExecutor.java |  8 ++
 .../org/apache/camel/processor/LoopProcessor.java  |  2 +-
 .../apache/camel/processor/MulticastProcessor.java |  2 +-
 .../java/org/apache/camel/processor/Pipeline.java  |  2 +-
 .../errorhandler/RedeliveryErrorHandler.java       |  2 +-
 .../issues/RetryWhileStackOverflowIssueTest.java   | 84 ++++++++++++++++++++
 10 files changed, 202 insertions(+), 5 deletions(-)

diff --git a/components/camel-reactive-executor-vertx/pom.xml 
b/components/camel-reactive-executor-vertx/pom.xml
index 44a4aed..2190fd5 100644
--- a/components/camel-reactive-executor-vertx/pom.xml
+++ b/components/camel-reactive-executor-vertx/pom.xml
@@ -35,7 +35,7 @@
         <firstVersion>3.0.0</firstVersion>
         <label>reactive</label>
         <title>Reactive Executor Vert.x</title>
-        <supportLevel>experimental</supportLevel>
+        <supportLevel>Experimental</supportLevel>
     </properties>
 
     <dependencies>
diff --git 
a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
 
b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 51c1c87..c778bca 100644
--- 
a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ 
b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -107,6 +107,12 @@ public class VertXReactiveExecutor extends ServiceSupport 
implements CamelContex
     }
 
     @Override
+    public void scheduleQueue(Runnable runnable) {
+        // not supported so schedule sync
+        scheduleSync(runnable);
+    }
+
+    @Override
     public boolean executeFromQueue() {
         // not supported so return false
         return false;
diff --git 
a/components/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactedRetryWhileStackSizeTest.java
 
b/components/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactedRetryWhileStackSizeTest.java
new file mode 100644
index 0000000..c1384fd
--- /dev/null
+++ 
b/components/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactedRetryWhileStackSizeTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.interceptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TransactedRetryWhileStackSizeTest extends 
TransactionClientDataSourceSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransactedRetryWhileStackSizeTest.class);
+    private static final boolean PRINT_STACK_TRACE = false;
+    private static final int MAX_DEPTH = 100;
+
+    @Test
+    public void testStackSize() throws Exception {
+        getMockEndpoint("mock:error").expectedMessageCount(1);
+        
getMockEndpoint("mock:error").message(0).body().isInstanceOf(MyCoolDude.class);
+
+        MyCoolDude dude = new MyCoolDude();
+        template.sendBody("seda:start", dude);
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1000 + 1, dude.getCounter());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException().retryWhile(simple("${body.areWeCool} == 
'no'")).redeliveryDelay(0)
+                        .handled(true).to("mock:error");
+
+                from("seda:start")
+                        .transacted()
+                        .recipientList(constant("foo:unknown"));
+            }
+        };
+    }
+
+    public static int currentStackSize() {
+        int depth = Thread.currentThread().getStackTrace().length;
+        if (PRINT_STACK_TRACE) {
+            new Throwable("Printing Stacktrace depth: " + 
depth).printStackTrace(System.err);
+        }
+        return depth;
+    }
+
+    public static class MyCoolDude {
+
+        private int counter;
+
+        public String areWeCool() {
+            int size = currentStackSize();
+            if (size > MAX_DEPTH) {
+                LOG.error("Stacktrace max depth: {}", size);
+                return "no";
+            }
+            if (counter++ < 1000) {
+                return "no";
+            } else {
+                return "yes";
+            }
+        }
+
+        public int getCounter() {
+            return counter;
+        }
+    }
+
+}
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index e78fb3d..aabffe3 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -48,6 +48,15 @@ public interface ReactiveExecutor {
     void scheduleSync(Runnable runnable);
 
     /**
+     * Schedules the task to be run later from the queue (current thread)
+     *
+     * This is used for routing {@link org.apache.camel.Exchange} using 
transactions.
+     *
+     * @param runnable the task
+     */
+    void scheduleQueue(Runnable runnable);
+
+    /**
      * Executes the next task (if supported by the reactive executor 
implementation)
      *
      * @return true if a task was executed or false if no more pending tasks
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 343254d..213bacf 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -70,6 +70,14 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
     }
 
     @Override
+    public void scheduleQueue(Runnable runnable) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("ScheduleQueue: {}", runnable);
+        }
+        workers.get().queue.add(runnable);
+    }
+
+    @Override
     public boolean executeFromQueue() {
         return workers.get().executeFromQueue();
     }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 435ed9d..fe91562 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -75,7 +75,7 @@ public class LoopProcessor extends DelegateAsyncProcessor 
implements Traceable,
             LoopState state = new LoopState(exchange, callback);
 
             if (exchange.isTransacted()) {
-                reactiveExecutor.scheduleSync(state);
+                reactiveExecutor.scheduleQueue(state);
             } else {
                 reactiveExecutor.scheduleMain(state);
             }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index d125723..4c9dcce 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -330,7 +330,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             executorService.submit(() -> reactiveExecutor.schedule(state));
         } else {
             if (exchange.isTransacted()) {
-                reactiveExecutor.scheduleSync(state);
+                reactiveExecutor.scheduleQueue(state);
             } else {
                 reactiveExecutor.scheduleMain(state);
             }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
index 06e69f6..29d7596 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -179,7 +179,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
         PooledExchangeTask task = taskFactory.acquire(exchange, callback);
 
         if (exchange.isTransacted()) {
-            reactiveExecutor.scheduleSync(task);
+            reactiveExecutor.scheduleQueue(task);
         } else {
             reactiveExecutor.scheduleMain(task);
         }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 65d03d5..385fac4 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -211,7 +211,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
 
         // Run it
         if (exchange.isTransacted()) {
-            reactiveExecutor.scheduleSync(task);
+            reactiveExecutor.scheduleQueue(task);
         } else {
             reactiveExecutor.scheduleMain(task);
         }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/RetryWhileStackOverflowIssueTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/RetryWhileStackOverflowIssueTest.java
new file mode 100644
index 0000000..b2f6d59
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/RetryWhileStackOverflowIssueTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RetryWhileStackOverflowIssueTest extends ContextTestSupport {
+
+    private static final boolean PRINT_STACK_TRACE = false;
+
+    @Test
+    public void testRetry() throws Exception {
+        getMockEndpoint("mock:error").expectedMessageCount(1);
+        
getMockEndpoint("mock:error").message(0).body().isInstanceOf(MyCoolDude.class);
+
+        MyCoolDude dude = new MyCoolDude();
+        template.sendBody("direct:start", dude);
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1000 + 1, dude.getCounter());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
onException(IllegalArgumentException.class).retryWhile(simple("${body.areWeCool}
 == 'no'")).redeliveryDelay(0)
+                        .handled(true).to("mock:error");
+
+                from("direct:start")
+                        //                        .transacted()
+                        .throwException(new 
IllegalArgumentException("Forced"));
+            }
+        };
+    }
+
+    public static class MyCoolDude {
+
+        private int counter;
+
+        public String areWeCool() {
+            int size = currentStackSize();
+            System.out.println("Stacksize: " + size);
+            if (counter++ < 1000) {
+                return "no";
+            } else {
+                return "yes";
+            }
+        }
+
+        public int getCounter() {
+            return counter;
+        }
+    }
+
+    private static int currentStackSize() {
+        int depth = Thread.currentThread().getStackTrace().length;
+        if (PRINT_STACK_TRACE) {
+            new Throwable("Printing Stacktrace depth: " + 
depth).printStackTrace(System.err);
+        }
+        return depth;
+    }
+
+}

Reply via email to