Repository: camel Updated Branches: refs/heads/camel-2.15.x 18d7bfca7 -> a0dac50e7 refs/heads/master 6a62f9ae2 -> 0d24370a7
CAMEL-9037: When using defaultTaskExecutorType then the thread pool should shutdown when the DMLC is stopping. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0d24370a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0d24370a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0d24370a Branch: refs/heads/master Commit: 0d24370a70ef6e809228c48c6e2bd2519251d5fd Parents: 6a62f9a Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 31 13:07:01 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 31 13:07:01 2015 +0200 ---------------------------------------------------------------------- .../jms/DefaultJmsMessageListenerContainer.java | 36 ++++++++++++++------ .../component/jms/DefaultTaskExecutorType.java | 13 +++---- .../camel/component/jms/JmsConfiguration.java | 1 + 3 files changed, 34 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0d24370a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java index ba3282e..63e5382 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java @@ -36,6 +36,7 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo private final JmsEndpoint endpoint; private final boolean allowQuickStop; + private volatile TaskExecutor taskExecutor; public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint) { this(endpoint, true); @@ -96,23 +97,28 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern(); String beanName = getBeanName() == null ? endpoint.getThreadName() : getBeanName(); + TaskExecutor answer; + if (endpoint.getDefaultTaskExecutorType() == DefaultTaskExecutorType.ThreadPool) { - ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor(); - answer.setBeanName(beanName); - answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); - answer.setCorePoolSize(endpoint.getConcurrentConsumers()); + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setBeanName(beanName); + executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); + executor.setCorePoolSize(endpoint.getConcurrentConsumers()); // Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately. // We set no upper-bound on the thread pool (no maxPoolSize) as it's already implicitly constrained by // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up to a level of concurrency as // defined by maxConcurrentConsumers). - answer.setQueueCapacity(0); - answer.initialize(); - return answer; + executor.setQueueCapacity(0); + executor.initialize(); + answer = executor; } else { - SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName); - answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); - return answer; + SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(beanName); + executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); + answer = executor; } + + taskExecutor = answer; + return answer; } @Override @@ -122,6 +128,11 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo + " and sharedConnectionEnabled: " + sharedConnectionEnabled()); } super.stop(); + + if (taskExecutor instanceof ThreadPoolTaskExecutor) { + ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor; + executor.destroy(); + } } @Override @@ -131,6 +142,11 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo + " and sharedConnectionEnabled: " + sharedConnectionEnabled()); } super.destroy(); + + if (taskExecutor instanceof ThreadPoolTaskExecutor) { + ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor; + executor.destroy(); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/0d24370a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java index aa6166a..1022531 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java @@ -23,6 +23,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * Hints what type of default task executor our {@link DefaultJmsMessageListenerContainer} should use. + * * @since 2.10.3 */ public enum DefaultTaskExecutorType { @@ -30,13 +31,13 @@ public enum DefaultTaskExecutorType { /** * Use a {@link ThreadPoolTaskExecutor} as the underlying task executor for consuming messages. * It will be configured with these attributes: - * <p /> + * <p/> * <li> - * <ul>{@code corePoolSize} = concurrentConsumers</ul> - * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing the thread pool, - * see Javadoc of {@link ThreadPoolExecutor}.</ul> - * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency should be limited by - * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul> + * <ul>{@code corePoolSize} = concurrentConsumers</ul> + * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing the thread pool, + * see Javadoc of {@link ThreadPoolExecutor}.</ul> + * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency should be limited by + * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul> * </li> */ ThreadPool, http://git-wip-us.apache.org/repos/asf/camel/blob/0d24370a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index c670c35..fa9d38e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -204,6 +204,7 @@ public class JmsConfiguration implements Cloneable { private MessageListenerContainerFactory messageListenerContainerFactory; @UriParam private boolean includeSentJMSMessageID; + @UriParam private DefaultTaskExecutorType defaultTaskExecutorType; @UriParam private boolean includeAllJMSXProperties;