CAMEL-9037: When using defaultTaskExecutorType then the thread pool should shutdown when the DMLC is stopping.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a0dac50e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a0dac50e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a0dac50e Branch: refs/heads/camel-2.15.x Commit: a0dac50e7fcbb2abdb23271d918b3656e356266f Parents: 18d7bfc Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 31 13:07:01 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 31 13:07:28 2015 +0200 ---------------------------------------------------------------------- .../jms/DefaultJmsMessageListenerContainer.java | 36 ++++++++++++++------ .../component/jms/DefaultTaskExecutorType.java | 13 +++---- .../camel/component/jms/JmsConfiguration.java | 1 + 3 files changed, 34 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java index ba3282e..63e5382 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java @@ -36,6 +36,7 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo private final JmsEndpoint endpoint; private final boolean allowQuickStop; + private volatile TaskExecutor taskExecutor; public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint) { this(endpoint, true); @@ -96,23 +97,28 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern(); String beanName = getBeanName() == null ? endpoint.getThreadName() : getBeanName(); + TaskExecutor answer; + if (endpoint.getDefaultTaskExecutorType() == DefaultTaskExecutorType.ThreadPool) { - ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor(); - answer.setBeanName(beanName); - answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); - answer.setCorePoolSize(endpoint.getConcurrentConsumers()); + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setBeanName(beanName); + executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); + executor.setCorePoolSize(endpoint.getConcurrentConsumers()); // Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately. // We set no upper-bound on the thread pool (no maxPoolSize) as it's already implicitly constrained by // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up to a level of concurrency as // defined by maxConcurrentConsumers). - answer.setQueueCapacity(0); - answer.initialize(); - return answer; + executor.setQueueCapacity(0); + executor.initialize(); + answer = executor; } else { - SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName); - answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); - return answer; + SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(beanName); + executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); + answer = executor; } + + taskExecutor = answer; + return answer; } @Override @@ -122,6 +128,11 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo + " and sharedConnectionEnabled: " + sharedConnectionEnabled()); } super.stop(); + + if (taskExecutor instanceof ThreadPoolTaskExecutor) { + ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor; + executor.destroy(); + } } @Override @@ -131,6 +142,11 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo + " and sharedConnectionEnabled: " + sharedConnectionEnabled()); } super.destroy(); + + if (taskExecutor instanceof ThreadPoolTaskExecutor) { + ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor; + executor.destroy(); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java index aa6166a..1022531 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java @@ -23,6 +23,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * Hints what type of default task executor our {@link DefaultJmsMessageListenerContainer} should use. + * * @since 2.10.3 */ public enum DefaultTaskExecutorType { @@ -30,13 +31,13 @@ public enum DefaultTaskExecutorType { /** * Use a {@link ThreadPoolTaskExecutor} as the underlying task executor for consuming messages. * It will be configured with these attributes: - * <p /> + * <p/> * <li> - * <ul>{@code corePoolSize} = concurrentConsumers</ul> - * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing the thread pool, - * see Javadoc of {@link ThreadPoolExecutor}.</ul> - * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency should be limited by - * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul> + * <ul>{@code corePoolSize} = concurrentConsumers</ul> + * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing the thread pool, + * see Javadoc of {@link ThreadPoolExecutor}.</ul> + * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency should be limited by + * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul> * </li> */ ThreadPool, http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index 8c6c5fe..7c5e151 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -199,6 +199,7 @@ public class JmsConfiguration implements Cloneable { private MessageListenerContainerFactory messageListenerContainerFactory; @UriParam private boolean includeSentJMSMessageID; + @UriParam private DefaultTaskExecutorType defaultTaskExecutorType; @UriParam private boolean includeAllJMSXProperties;