This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch split-stuck
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 96b93b0173722ada506ef161534f6abab98c00ae
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Jun 28 11:53:16 2024 +0200

    CAMEL-16829: camel-core - Stuck processing with nested parallel splits and 
custom thread pool
---
 .../apache/camel/processor/MulticastProcessor.java | 27 ++++++-
 .../AggregateParallelThreadPoolExhaustedTest.java  | 90 ++++++++++++++++++++++
 .../SplitParallelThreadPoolExhaustedTest.java      | 89 +++++++++++++++++++++
 .../camel/support/DefaultThreadPoolFactory.java    |  2 +
 .../ROOT/pages/camel-4x-upgrade-guide-4_7.adoc     |  5 ++
 5 files changed, 209 insertions(+), 4 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 3aa6ea63069..fb9586865c0 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -75,6 +76,7 @@ import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AsyncCompletionService;
+import org.apache.camel.util.concurrent.Rejectable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -347,7 +349,11 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 ? new MulticastTransactedTask(exchange, pairs, callback, size)
                 : new MulticastReactiveTask(exchange, pairs, callback, size);
         if (isParallelProcessing()) {
-            executorService.submit(() -> reactiveExecutor.schedule(state));
+            try {
+                executorService.submit(() -> reactiveExecutor.schedule(state));
+            } catch (RejectedExecutionException e) {
+                state.reject();
+            }
         } else {
             if (exchange.isTransacted()) {
                 reactiveExecutor.scheduleQueue(state);
@@ -362,10 +368,16 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         return false;
     }
 
-    protected void schedule(Runnable runnable) {
+    protected void schedule(final Runnable runnable) {
         if (isParallelProcessing()) {
             Runnable task = prepareParallelTask(runnable);
-            executorService.submit(() -> reactiveExecutor.schedule(task));
+            try {
+                executorService.submit(() -> reactiveExecutor.schedule(task));
+            } catch (RejectedExecutionException e) {
+                if (runnable instanceof Rejectable rej) {
+                    rej.reject();
+                }
+            }
         } else {
             reactiveExecutor.schedule(runnable);
         }
@@ -402,7 +414,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         return answer;
     }
 
-    protected abstract class MulticastTask implements Runnable {
+    protected abstract class MulticastTask implements Runnable, Rejectable {
 
         final Exchange original;
         final Iterable<ProcessorExchangePair> pairs;
@@ -527,6 +539,13 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 MulticastProcessor.this.doDone(original, exchange, pairs, 
callback, false, forceExhaust);
             }
         }
+
+        @Override
+        public void reject() {
+            original.setException(new RejectedExecutionException("Task 
rejected executing from ExecutorService"));
+            // and do the done work
+            doDone(null, false);
+        }
     }
 
     /**
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/AggregateParallelThreadPoolExhaustedTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/AggregateParallelThreadPoolExhaustedTest.java
new file mode 100644
index 00000000000..0635b6d2bc6
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/AggregateParallelThreadPoolExhaustedTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolBuilder;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.parallel.Isolated;
+
+/**
+ * Tests that the Aggregate EIP does not hang-threads due to thread-pools 
being exhausted and rejects new tasks
+ */
+@Isolated
+@Timeout(30)
+public class AggregateParallelThreadPoolExhaustedTest extends 
ContextTestSupport {
+
+    @Test
+    public void testAggregateParallel() throws Exception {
+        AtomicInteger ok = new AtomicInteger();
+        AtomicInteger fail = new AtomicInteger();
+
+        Runnable r = () -> {
+            try {
+                template.sendBody("direct:start", "Body");
+                ok.incrementAndGet();
+            } catch (Exception e) {
+                fail.incrementAndGet();
+            }
+        };
+        var pool = Executors.newFixedThreadPool(10);
+        for (int i = 0; i < 10; i++) {
+            pool.submit(r);
+        }
+
+        // should not complete success as all tasks should be rejected due to 
thread-pool rejecting when no space
+        Awaitility.await().untilAsserted(() -> {
+            Assertions.assertTrue(fail.get() > 0, "Some should fail");
+        });
+
+        log.info("Errors: {}", fail.get());
+        log.info("OK: {}", ok.get());
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                final var executorService = new ThreadPoolBuilder(getContext())
+                        .poolSize(1)
+                        .maxPoolSize(1)
+                        .maxQueueSize(0)
+                        .build("inner");
+
+                from("direct:start")
+                        .to("direct:inner?synchronous=true");
+
+                from("direct:inner")
+                        .aggregate(constant(true), new 
BodyInAggregatingStrategy()).executorService(executorService)
+                        .completionSize(2)
+                        // force delay so threads are not free in the pool 
causing tasks to be rejected
+                        .delay(5000)
+                        .log("${body}");
+            }
+        };
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelThreadPoolExhaustedTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelThreadPoolExhaustedTest.java
new file mode 100644
index 00000000000..57fc06b7611
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelThreadPoolExhaustedTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolBuilder;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.parallel.Isolated;
+
+/**
+ * Tests that the Split EIP does not hang-threads due to thread-pools being 
exhausted and rejects new tasks
+ */
+@Isolated
+@Timeout(30)
+public class SplitParallelThreadPoolExhaustedTest extends ContextTestSupport {
+
+    @Test
+    public void testSplitParallel() throws Exception {
+        AtomicInteger ok = new AtomicInteger();
+        AtomicInteger fail = new AtomicInteger();
+
+        Runnable r = () -> {
+            try {
+                template.sendBody("direct:start", List.of(List.of("0-0", 
"0-1"), List.of("1-0", "1-1")));
+                ok.incrementAndGet();
+            } catch (Exception e) {
+                fail.incrementAndGet();
+            }
+        };
+        var pool = Executors.newFixedThreadPool(10);
+        for (int i = 0; i < 10; i++) {
+            pool.submit(r);
+        }
+
+        // should not complete success as all tasks should be rejected due to 
thread-pool rejecting when no space
+        Awaitility.await().untilAsserted(() -> {
+            Assertions.assertEquals(10, fail.get(), "All should fail");
+        });
+
+        log.info("Errors: {}", fail.get());
+        log.info("OK: {}", ok.get());
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                final var executorService = new ThreadPoolBuilder(getContext())
+                        .poolSize(1)
+                        .maxPoolSize(1)
+                        .maxQueueSize(0)
+                        .build("inner");
+
+                from("direct:start")
+                        .split(body()).parallelProcessing()
+                        .to("direct:inner?synchronous=true");
+
+                from("direct:inner")
+                        .split(body()).executorService(executorService)
+                        .log("${body}");
+            }
+        };
+    }
+
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
index aa9c68845da..ef1c0befd8d 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -101,6 +101,8 @@ public class DefaultThreadPoolFactory extends 
ServiceSupport implements CamelCon
         } else if (maxQueueSize <= 0) {
             // use a synchronous queue for direct-handover (no tasks stored on 
the queue)
             workQueue = new SynchronousQueue<>();
+            // force pool to reject tasks if no room (as otherwise threads can 
get stuck with reactive routing-engine)
+            rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
         } else {
             // bounded task queue to store tasks on the queue
             workQueue = new LinkedBlockingQueue<>(maxQueueSize);
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_7.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_7.adoc
index 4c78cfff977..3d725b94d3d 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_7.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_7.adoc
@@ -21,6 +21,11 @@ Add default values to `ThrottlingExceptionRoutePolicy` route 
policy.
 The `EndpointRegistry` interface has been slightly changed to now directly 
extends `Map<NormalizedEndpointUri, Endpoint>` 
 instead of being a parameterized type. This may cause some compilation 
failures if the code is declaring a variable for the registry.
 
+When using thread pools with no queues (`maxQueueSize=0`) is now always using 
rejection policy `Abort`, instead of `CallerRuns`.
+This is necessary, to ensure the thread pool cannot cause _stuck threads_ in 
Camel routing engine due to thread pool did not
+have any free thread, and current thread was _piggy bagged_ (due to 
`CallerRuns`) to process the tasks which can lead to routing engine
+now able to continued potential other tasks that are forced being waiting.
+
 === camel-health
 
 Routes which has are set to **not** auto-startup are reported as UP in 
health-checks.

Reply via email to