CAMEL-9037: When using defaultTaskExecutorType then the thread pool should 
shutdown when the DMLC is stopping.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a0dac50e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a0dac50e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a0dac50e

Branch: refs/heads/camel-2.15.x
Commit: a0dac50e7fcbb2abdb23271d918b3656e356266f
Parents: 18d7bfc
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jul 31 13:07:01 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jul 31 13:07:28 2015 +0200

----------------------------------------------------------------------
 .../jms/DefaultJmsMessageListenerContainer.java | 36 ++++++++++++++------
 .../component/jms/DefaultTaskExecutorType.java  | 13 +++----
 .../camel/component/jms/JmsConfiguration.java   |  1 +
 3 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
index ba3282e..63e5382 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
@@ -36,6 +36,7 @@ public class DefaultJmsMessageListenerContainer extends 
DefaultMessageListenerCo
 
     private final JmsEndpoint endpoint;
     private final boolean allowQuickStop;
+    private volatile TaskExecutor taskExecutor;
 
     public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint) {
         this(endpoint, true);
@@ -96,23 +97,28 @@ public class DefaultJmsMessageListenerContainer extends 
DefaultMessageListenerCo
         String pattern = 
endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
         String beanName = getBeanName() == null ? endpoint.getThreadName() : 
getBeanName();
 
+        TaskExecutor answer;
+
         if (endpoint.getDefaultTaskExecutorType() == 
DefaultTaskExecutorType.ThreadPool) {
-            ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor();
-            answer.setBeanName(beanName);
-            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, 
true));
-            answer.setCorePoolSize(endpoint.getConcurrentConsumers());
+            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+            executor.setBeanName(beanName);
+            executor.setThreadFactory(new CamelThreadFactory(pattern, 
beanName, true));
+            executor.setCorePoolSize(endpoint.getConcurrentConsumers());
             // Direct hand-off mode. Do not queue up tasks: assign it to a 
thread immediately.
             // We set no upper-bound on the thread pool (no maxPoolSize) as 
it's already implicitly constrained by
             // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only 
grow up to a level of concurrency as
             // defined by maxConcurrentConsumers).
-            answer.setQueueCapacity(0);
-            answer.initialize();
-            return answer;
+            executor.setQueueCapacity(0);
+            executor.initialize();
+            answer = executor;
         } else {
-            SimpleAsyncTaskExecutor answer = new 
SimpleAsyncTaskExecutor(beanName);
-            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, 
true));
-            return answer;
+            SimpleAsyncTaskExecutor executor = new 
SimpleAsyncTaskExecutor(beanName);
+            executor.setThreadFactory(new CamelThreadFactory(pattern, 
beanName, true));
+            answer = executor;
         }
+
+        taskExecutor = answer;
+        return answer;
     }
 
     @Override
@@ -122,6 +128,11 @@ public class DefaultJmsMessageListenerContainer extends 
DefaultMessageListenerCo
                     + " and sharedConnectionEnabled: " + 
sharedConnectionEnabled());
         }
         super.stop();
+
+        if (taskExecutor instanceof ThreadPoolTaskExecutor) {
+            ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) 
taskExecutor;
+            executor.destroy();
+        }
     }
 
     @Override
@@ -131,6 +142,11 @@ public class DefaultJmsMessageListenerContainer extends 
DefaultMessageListenerCo
                     + " and sharedConnectionEnabled: " + 
sharedConnectionEnabled());
         }
         super.destroy();
+
+        if (taskExecutor instanceof ThreadPoolTaskExecutor) {
+            ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) 
taskExecutor;
+            executor.destroy();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
index aa6166a..1022531 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
@@ -23,6 +23,7 @@ import 
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 /**
  * Hints what type of default task executor our {@link 
DefaultJmsMessageListenerContainer} should use.
+ *
  * @since 2.10.3
  */
 public enum DefaultTaskExecutorType {
@@ -30,13 +31,13 @@ public enum DefaultTaskExecutorType {
     /**
      * Use a {@link ThreadPoolTaskExecutor} as the underlying task executor 
for consuming messages.
      * It will be configured with these attributes:
-     * <p />
+     * <p/>
      * <li>
-     * <ul>{@code corePoolSize} = concurrentConsumers</ul>
-     * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for 
growing the thread pool,
-     * see Javadoc of {@link ThreadPoolExecutor}.</ul>
-     * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as 
concurrency should be limited by
-     * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul>
+     *   <ul>{@code corePoolSize} = concurrentConsumers</ul>
+     *   <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for 
growing the thread pool,
+     *       see Javadoc of {@link ThreadPoolExecutor}.</ul>
+     *   <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as 
concurrency should be limited by
+     *       the endpoint's maxConcurrentConsumers, not by the thread 
pool).</ul>
      * </li>
      */
     ThreadPool,

http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 8c6c5fe..7c5e151 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -199,6 +199,7 @@ public class JmsConfiguration implements Cloneable {
     private MessageListenerContainerFactory messageListenerContainerFactory;
     @UriParam
     private boolean includeSentJMSMessageID;
+    @UriParam
     private DefaultTaskExecutorType defaultTaskExecutorType;
     @UriParam
     private boolean includeAllJMSXProperties;

Reply via email to