This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 340407b  CAMEL-14591: Fixed thread pool for recipinent list EIP should 
only be created when really needed (timeout enabled) and that the pool is also 
shutdown.
340407b is described below

commit 340407b2556bf3e58178b6d1c749e393a12b6898
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Feb 19 12:01:49 2020 +0100

    CAMEL-14591: Fixed thread pool for recipinent list EIP should only be 
created when really needed (timeout enabled) and that the pool is also shutdown.
---
 .../apache/camel/processor/MulticastProcessor.java | 16 +++++++++++++--
 .../org/apache/camel/processor/RecipientList.java  | 23 +++++++++++-----------
 .../java/org/apache/camel/processor/Splitter.java  |  1 -
 .../camel/processor/RecipientListNoCacheTest.java  |  5 +++++
 .../RecipientListParallelTimeoutTest.java          | 22 ++++++++++++++++++++-
 5 files changed, 51 insertions(+), 16 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index dd2ae22..ccd9d01 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -161,6 +161,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
     private ExecutorService aggregateExecutorService;
+    private boolean shutdownAggregateExecutorService;
     private final long timeout;
     private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers 
= new ConcurrentHashMap<>();
     private final boolean shareUnitOfWork;
@@ -802,12 +803,13 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         if (isParallelProcessing() && executorService == null) {
             throw new IllegalArgumentException("ParallelProcessing is enabled 
but ExecutorService has not been set");
         }
-        if (aggregateExecutorService == null) {
+        if (timeout > 0 && aggregateExecutorService == null) {
             // use unbounded thread pool so we ensure the aggregate on-the-fly 
task always will have assigned a thread
             // and run the tasks when the task is submitted. If not then the 
aggregate task may not be able to run
             // and signal completion during processing, which would lead to 
what would appear as a dead-lock or a slow processing
             String name = getClass().getSimpleName() + "-AggregateTask";
             aggregateExecutorService = createAggregateExecutorService(name);
+            shutdownAggregateExecutorService = true;
         }
         if (aggregationStrategy instanceof CamelContextAware) {
             ((CamelContextAware) 
aggregationStrategy).setCamelContext(camelContext);
@@ -842,7 +844,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         if (shutdownExecutorService && executorService != null) {
             
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
         }
-        if (aggregateExecutorService != null) {
+        if (shutdownAggregateExecutorService && aggregateExecutorService != 
null) {
             
getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService);
         }
     }
@@ -968,6 +970,16 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         return shareUnitOfWork;
     }
 
+    public ExecutorService getAggregateExecutorService() {
+        return aggregateExecutorService;
+    }
+
+    public void setAggregateExecutorService(ExecutorService 
aggregateExecutorService) {
+        this.aggregateExecutorService = aggregateExecutorService;
+        // we use a custom executor so do not shutdown
+        this.shutdownAggregateExecutorService = false;
+    }
+
     @Override
     public List<Processor> next() {
         if (!hasNext()) {
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java 
b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
index e913ffd..4903429 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -72,7 +72,7 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
     private boolean shareUnitOfWork;
     private ExecutorService executorService;
     private boolean shutdownExecutorService;
-    private ExecutorService aggregateExecutorService;
+    private volatile ExecutorService aggregateExecutorService;
     private AggregationStrategy aggregationStrategy = new 
UseLatestAggregationStrategy();
 
     public RecipientList(CamelContext camelContext) {
@@ -190,21 +190,13 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
         RecipientListProcessor rlp = new 
RecipientListProcessor(exchange.getContext(), producerCache, iter, 
getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), 
isShutdownExecutorService(),
                 isStreaming(), isStopOnException(), getTimeout(), 
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
-                isStopOnAggregateException()) {
-            @Override
-            protected synchronized ExecutorService 
createAggregateExecutorService(String name) {
-                // use a shared executor service to avoid creating new thread 
pools
-                if (aggregateExecutorService == null) {
-                    aggregateExecutorService = 
super.createAggregateExecutorService("RecipientList-AggregateTask");
-                }
-                return aggregateExecutorService;
-            }
-        };
+                isStopOnAggregateException());
+        rlp.setAggregateExecutorService(aggregateExecutorService);
         rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
         rlp.setId(getId());
         rlp.setRouteId(getRouteId());
 
-        // start the service
+        // start ourselves
         try {
             ServiceHelper.startService(rlp);
         } catch (Exception e) {
@@ -232,6 +224,10 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
                 LOG.debug("RecipientList {} using ProducerCache with 
cacheSize={}", this, cacheSize);
             }
         }
+        if (timeout > 0) {
+            // use a cached thread pool so we each on-the-fly task has a 
dedicated thread to process completions as they come in
+            aggregateExecutorService = 
camelContext.getExecutorServiceManager().newScheduledThreadPool(this, 
"RecipientList-AggregateTask", 0);
+        }
         ServiceHelper.startService(aggregationStrategy, producerCache);
     }
 
@@ -244,6 +240,9 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownServices(producerCache, 
aggregationStrategy);
 
+        if (aggregateExecutorService != null) {
+            
camelContext.getExecutorServiceManager().shutdownNow(aggregateExecutorService);
+        }
         if (shutdownExecutorService && executorService != null) {
             
camelContext.getExecutorServiceManager().shutdownNow(executorService);
         }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java 
b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
index 0f09d12..491358c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
@@ -81,7 +81,6 @@ public class Splitter extends MulticastProcessor implements 
AsyncProcessor, Trac
     public boolean process(Exchange exchange, final AsyncCallback callback) {
         AggregationStrategy strategy = getAggregationStrategy();
 
-
         // set original exchange if not already pre-configured
         if (strategy instanceof UseOriginalAggregationStrategy) {
             // need to create a new private instance, as we can also have 
concurrency issue so we cannot store state
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index 2660405..f4a9575 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Processor;
@@ -52,6 +53,10 @@ public class RecipientListNoCacheTest extends 
ContextTestSupport {
         Object pc = 
ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
         assertNotNull(pc);
         assertIsInstanceOf(EmptyProducerCache.class, pc);
+
+        // and no thread pool is in use as timeout is 0
+        pc = 
ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"),
 rl);
+        assertNull(pc);
     }
 
     protected void sendBody(String body) {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
index 84458dd..b93b50d 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
@@ -16,11 +16,16 @@
  */
 package org.apache.camel.processor;
 
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.ReflectionHelper;
 import org.junit.Test;
 
 public class RecipientListParallelTimeoutTest extends ContextTestSupport {
@@ -34,6 +39,21 @@ public class RecipientListParallelTimeoutTest extends 
ContextTestSupport {
         template.sendBodyAndHeader("direct:start", "Hello", "slip", 
"direct:a,direct:b,direct:c");
 
         assertMockEndpointsSatisfied();
+
+        // make sure that the thread pool will be shutdown
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        RecipientList rl = (RecipientList) list.get(0);
+        assertNotNull(rl);
+
+        Object pc = 
ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"),
 rl);
+        assertNotNull(pc);
+        ExecutorService es = assertIsInstanceOf(ExecutorService.class, pc);
+
+        assertFalse(es.isShutdown());
+
+        // now stop camel and ensure the thread pool is stopped
+        context.stop();
+        assertTrue(es.isShutdown());
     }
 
     @Override
@@ -51,7 +71,7 @@ public class RecipientListParallelTimeoutTest extends 
ContextTestSupport {
                         oldExchange.getIn().setBody(body + 
newExchange.getIn().getBody(String.class));
                         return oldExchange;
                     }
-                }).parallelProcessing().timeout(500).to("mock:result");
+                
}).parallelProcessing().timeout(500).id("foo").to("mock:result");
 
                 from("direct:a").delay(1000).setBody(constant("A"));
 

Reply via email to