This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch split-stuck in repository https://gitbox.apache.org/repos/asf/camel.git
commit 96b93b0173722ada506ef161534f6abab98c00ae Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jun 28 11:53:16 2024 +0200 CAMEL-16829: camel-core - Stuck processing with nested parallel splits and custom thread pool --- .../apache/camel/processor/MulticastProcessor.java | 27 ++++++- .../AggregateParallelThreadPoolExhaustedTest.java | 90 ++++++++++++++++++++++ .../SplitParallelThreadPoolExhaustedTest.java | 89 +++++++++++++++++++++ .../camel/support/DefaultThreadPoolFactory.java | 2 + .../ROOT/pages/camel-4x-upgrade-guide-4_7.adoc | 5 ++ 5 files changed, 209 insertions(+), 4 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 3aa6ea63069..fb9586865c0 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -75,6 +76,7 @@ import org.apache.camel.util.CastUtils; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StopWatch; import org.apache.camel.util.concurrent.AsyncCompletionService; +import org.apache.camel.util.concurrent.Rejectable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -347,7 +349,11 @@ public class MulticastProcessor extends AsyncProcessorSupport ? new MulticastTransactedTask(exchange, pairs, callback, size) : new MulticastReactiveTask(exchange, pairs, callback, size); if (isParallelProcessing()) { - executorService.submit(() -> reactiveExecutor.schedule(state)); + try { + executorService.submit(() -> reactiveExecutor.schedule(state)); + } catch (RejectedExecutionException e) { + state.reject(); + } } else { if (exchange.isTransacted()) { reactiveExecutor.scheduleQueue(state); @@ -362,10 +368,16 @@ public class MulticastProcessor extends AsyncProcessorSupport return false; } - protected void schedule(Runnable runnable) { + protected void schedule(final Runnable runnable) { if (isParallelProcessing()) { Runnable task = prepareParallelTask(runnable); - executorService.submit(() -> reactiveExecutor.schedule(task)); + try { + executorService.submit(() -> reactiveExecutor.schedule(task)); + } catch (RejectedExecutionException e) { + if (runnable instanceof Rejectable rej) { + rej.reject(); + } + } } else { reactiveExecutor.schedule(runnable); } @@ -402,7 +414,7 @@ public class MulticastProcessor extends AsyncProcessorSupport return answer; } - protected abstract class MulticastTask implements Runnable { + protected abstract class MulticastTask implements Runnable, Rejectable { final Exchange original; final Iterable<ProcessorExchangePair> pairs; @@ -527,6 +539,13 @@ public class MulticastProcessor extends AsyncProcessorSupport MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust); } } + + @Override + public void reject() { + original.setException(new RejectedExecutionException("Task rejected executing from ExecutorService")); + // and do the done work + doDone(null, false); + } } /** diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/AggregateParallelThreadPoolExhaustedTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/AggregateParallelThreadPoolExhaustedTest.java new file mode 100644 index 00000000000..0635b6d2bc6 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/AggregateParallelThreadPoolExhaustedTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.ThreadPoolBuilder; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Tests that the Aggregate EIP does not hang-threads due to thread-pools being exhausted and rejects new tasks + */ +@Isolated +@Timeout(30) +public class AggregateParallelThreadPoolExhaustedTest extends ContextTestSupport { + + @Test + public void testAggregateParallel() throws Exception { + AtomicInteger ok = new AtomicInteger(); + AtomicInteger fail = new AtomicInteger(); + + Runnable r = () -> { + try { + template.sendBody("direct:start", "Body"); + ok.incrementAndGet(); + } catch (Exception e) { + fail.incrementAndGet(); + } + }; + var pool = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + pool.submit(r); + } + + // should not complete success as all tasks should be rejected due to thread-pool rejecting when no space + Awaitility.await().untilAsserted(() -> { + Assertions.assertTrue(fail.get() > 0, "Some should fail"); + }); + + log.info("Errors: {}", fail.get()); + log.info("OK: {}", ok.get()); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + final var executorService = new ThreadPoolBuilder(getContext()) + .poolSize(1) + .maxPoolSize(1) + .maxQueueSize(0) + .build("inner"); + + from("direct:start") + .to("direct:inner?synchronous=true"); + + from("direct:inner") + .aggregate(constant(true), new BodyInAggregatingStrategy()).executorService(executorService) + .completionSize(2) + // force delay so threads are not free in the pool causing tasks to be rejected + .delay(5000) + .log("${body}"); + } + }; + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelThreadPoolExhaustedTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelThreadPoolExhaustedTest.java new file mode 100644 index 00000000000..57fc06b7611 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelThreadPoolExhaustedTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.ThreadPoolBuilder; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Tests that the Split EIP does not hang-threads due to thread-pools being exhausted and rejects new tasks + */ +@Isolated +@Timeout(30) +public class SplitParallelThreadPoolExhaustedTest extends ContextTestSupport { + + @Test + public void testSplitParallel() throws Exception { + AtomicInteger ok = new AtomicInteger(); + AtomicInteger fail = new AtomicInteger(); + + Runnable r = () -> { + try { + template.sendBody("direct:start", List.of(List.of("0-0", "0-1"), List.of("1-0", "1-1"))); + ok.incrementAndGet(); + } catch (Exception e) { + fail.incrementAndGet(); + } + }; + var pool = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + pool.submit(r); + } + + // should not complete success as all tasks should be rejected due to thread-pool rejecting when no space + Awaitility.await().untilAsserted(() -> { + Assertions.assertEquals(10, fail.get(), "All should fail"); + }); + + log.info("Errors: {}", fail.get()); + log.info("OK: {}", ok.get()); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + final var executorService = new ThreadPoolBuilder(getContext()) + .poolSize(1) + .maxPoolSize(1) + .maxQueueSize(0) + .build("inner"); + + from("direct:start") + .split(body()).parallelProcessing() + .to("direct:inner?synchronous=true"); + + from("direct:inner") + .split(body()).executorService(executorService) + .log("${body}"); + } + }; + } + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java index aa9c68845da..ef1c0befd8d 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java @@ -101,6 +101,8 @@ public class DefaultThreadPoolFactory extends ServiceSupport implements CamelCon } else if (maxQueueSize <= 0) { // use a synchronous queue for direct-handover (no tasks stored on the queue) workQueue = new SynchronousQueue<>(); + // force pool to reject tasks if no room (as otherwise threads can get stuck with reactive routing-engine) + rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); } else { // bounded task queue to store tasks on the queue workQueue = new LinkedBlockingQueue<>(maxQueueSize); diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_7.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_7.adoc index 4c78cfff977..3d725b94d3d 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_7.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_7.adoc @@ -21,6 +21,11 @@ Add default values to `ThrottlingExceptionRoutePolicy` route policy. The `EndpointRegistry` interface has been slightly changed to now directly extends `Map<NormalizedEndpointUri, Endpoint>` instead of being a parameterized type. This may cause some compilation failures if the code is declaring a variable for the registry. +When using thread pools with no queues (`maxQueueSize=0`) is now always using rejection policy `Abort`, instead of `CallerRuns`. +This is necessary, to ensure the thread pool cannot cause _stuck threads_ in Camel routing engine due to thread pool did not +have any free thread, and current thread was _piggy bagged_ (due to `CallerRuns`) to process the tasks which can lead to routing engine +now able to continued potential other tasks that are forced being waiting. + === camel-health Routes which has are set to **not** auto-startup are reported as UP in health-checks.