This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 340407b CAMEL-14591: Fixed thread pool for recipinent list EIP should only be created when really needed (timeout enabled) and that the pool is also shutdown. 340407b is described below commit 340407b2556bf3e58178b6d1c749e393a12b6898 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Feb 19 12:01:49 2020 +0100 CAMEL-14591: Fixed thread pool for recipinent list EIP should only be created when really needed (timeout enabled) and that the pool is also shutdown. --- .../apache/camel/processor/MulticastProcessor.java | 16 +++++++++++++-- .../org/apache/camel/processor/RecipientList.java | 23 +++++++++++----------- .../java/org/apache/camel/processor/Splitter.java | 1 - .../camel/processor/RecipientListNoCacheTest.java | 5 +++++ .../RecipientListParallelTimeoutTest.java | 22 ++++++++++++++++++++- 5 files changed, 51 insertions(+), 16 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index dd2ae22..ccd9d01 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -161,6 +161,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat private final ExecutorService executorService; private final boolean shutdownExecutorService; private ExecutorService aggregateExecutorService; + private boolean shutdownAggregateExecutorService; private final long timeout; private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<>(); private final boolean shareUnitOfWork; @@ -802,12 +803,13 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat if (isParallelProcessing() && executorService == null) { throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); } - if (aggregateExecutorService == null) { + if (timeout > 0 && aggregateExecutorService == null) { // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing String name = getClass().getSimpleName() + "-AggregateTask"; aggregateExecutorService = createAggregateExecutorService(name); + shutdownAggregateExecutorService = true; } if (aggregationStrategy instanceof CamelContextAware) { ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext); @@ -842,7 +844,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat if (shutdownExecutorService && executorService != null) { getCamelContext().getExecutorServiceManager().shutdownNow(executorService); } - if (aggregateExecutorService != null) { + if (shutdownAggregateExecutorService && aggregateExecutorService != null) { getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService); } } @@ -968,6 +970,16 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat return shareUnitOfWork; } + public ExecutorService getAggregateExecutorService() { + return aggregateExecutorService; + } + + public void setAggregateExecutorService(ExecutorService aggregateExecutorService) { + this.aggregateExecutorService = aggregateExecutorService; + // we use a custom executor so do not shutdown + this.shutdownAggregateExecutorService = false; + } + @Override public List<Processor> next() { if (!hasNext()) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java index e913ffd..4903429 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java @@ -72,7 +72,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou private boolean shareUnitOfWork; private ExecutorService executorService; private boolean shutdownExecutorService; - private ExecutorService aggregateExecutorService; + private volatile ExecutorService aggregateExecutorService; private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); public RecipientList(CamelContext camelContext) { @@ -190,21 +190,13 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(), isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(), - isStopOnAggregateException()) { - @Override - protected synchronized ExecutorService createAggregateExecutorService(String name) { - // use a shared executor service to avoid creating new thread pools - if (aggregateExecutorService == null) { - aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask"); - } - return aggregateExecutorService; - } - }; + isStopOnAggregateException()); + rlp.setAggregateExecutorService(aggregateExecutorService); rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints()); rlp.setId(getId()); rlp.setRouteId(getRouteId()); - // start the service + // start ourselves try { ServiceHelper.startService(rlp); } catch (Exception e) { @@ -232,6 +224,10 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize); } } + if (timeout > 0) { + // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in + aggregateExecutorService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "RecipientList-AggregateTask", 0); + } ServiceHelper.startService(aggregationStrategy, producerCache); } @@ -244,6 +240,9 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou protected void doShutdown() throws Exception { ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy); + if (aggregateExecutorService != null) { + camelContext.getExecutorServiceManager().shutdownNow(aggregateExecutorService); + } if (shutdownExecutorService && executorService != null) { camelContext.getExecutorServiceManager().shutdownNow(executorService); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java index 0f09d12..491358c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java @@ -81,7 +81,6 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac public boolean process(Exchange exchange, final AsyncCallback callback) { AggregationStrategy strategy = getAggregationStrategy(); - // set original exchange if not already pre-configured if (strategy instanceof UseOriginalAggregationStrategy) { // need to create a new private instance, as we can also have concurrency issue so we cannot store state diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java index 2660405..f4a9575 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java @@ -17,6 +17,7 @@ package org.apache.camel.processor; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.camel.ContextTestSupport; import org.apache.camel.Processor; @@ -52,6 +53,10 @@ public class RecipientListNoCacheTest extends ContextTestSupport { Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl); assertNotNull(pc); assertIsInstanceOf(EmptyProducerCache.class, pc); + + // and no thread pool is in use as timeout is 0 + pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl); + assertNull(pc); } protected void sendBody(String body) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java index 84458dd..b93b50d 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java @@ -16,11 +16,16 @@ */ package org.apache.camel.processor; +import java.util.List; +import java.util.concurrent.ExecutorService; + import org.apache.camel.AggregationStrategy; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.ReflectionHelper; import org.junit.Test; public class RecipientListParallelTimeoutTest extends ContextTestSupport { @@ -34,6 +39,21 @@ public class RecipientListParallelTimeoutTest extends ContextTestSupport { template.sendBodyAndHeader("direct:start", "Hello", "slip", "direct:a,direct:b,direct:c"); assertMockEndpointsSatisfied(); + + // make sure that the thread pool will be shutdown + List<Processor> list = context.getRoute("route1").filter("foo"); + RecipientList rl = (RecipientList) list.get(0); + assertNotNull(rl); + + Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl); + assertNotNull(pc); + ExecutorService es = assertIsInstanceOf(ExecutorService.class, pc); + + assertFalse(es.isShutdown()); + + // now stop camel and ensure the thread pool is stopped + context.stop(); + assertTrue(es.isShutdown()); } @Override @@ -51,7 +71,7 @@ public class RecipientListParallelTimeoutTest extends ContextTestSupport { oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } - }).parallelProcessing().timeout(500).to("mock:result"); + }).parallelProcessing().timeout(500).id("foo").to("mock:result"); from("direct:a").delay(1000).setBody(constant("A"));