CAMEL-6439: Use own thread pool for routing exchanges when timeout is triggered when doing request/reply over JMS to not steal the scheduler thread or cause a potential dead-lock.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/47fec206 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/47fec206 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/47fec206 Branch: refs/heads/camel-2.17.x Commit: 47fec2063696156e02644bf0f1740e491a812a60 Parents: 89f6906 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 29 10:29:24 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 29 11:05:57 2016 +0200 ---------------------------------------------------------------------- .../camel/component/jms/JmsComponent.java | 7 +++++ .../camel/component/jms/JmsConfiguration.java | 14 ++++++++++ .../apache/camel/component/jms/JmsEndpoint.java | 5 ++++ .../apache/camel/component/jms/JmsProducer.java | 29 +++++++++++++++++--- .../jms/reply/CorrelationTimeoutMap.java | 28 ++++++++++++++----- .../camel/component/jms/reply/ReplyManager.java | 9 +++++- .../jms/reply/ReplyManagerSupport.java | 15 ++++++++-- 7 files changed, 93 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/47fec206/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java index 4ff90e9..9761bed 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java @@ -391,6 +391,13 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon } /** + * Specifies the maximum number of concurrent consumers for continue routing when timeout occurred when using request/reply over JMS. + */ + public void setReplyOnTimeoutToMaxConcurrentConsumers(int maxConcurrentConsumers) { + getConfiguration().setReplyToOnTimeoutMaxConcurrentConsumers(maxConcurrentConsumers); + } + + /** * The number of messages per task. -1 is unlimited. * If you use a range for concurrent consumers (eg min < max), then this option can be used to set * a value to eg 100 to control how fast the consumers will shrink when less work is required. http://git-wip-us.apache.org/repos/asf/camel/blob/47fec206/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index a4c57e2..7965014 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -183,6 +183,9 @@ public class JmsConfiguration implements Cloneable { description = "Specifies the maximum number of concurrent consumers when using request/reply over JMS." + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.") private int replyToMaxConcurrentConsumers; + @UriParam(label = "producer", defaultValue = "1", + description = "Specifies the maximum number of concurrent consumers for continue routing when timeout occurred when using request/reply over JMS.") + private int replyToOnTimeoutMaxConcurrentConsumers = 1; // JmsTemplate only @UriParam(label = "producer", defaultValue = "false", description = "Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages." @@ -1084,6 +1087,17 @@ public class JmsConfiguration implements Cloneable { this.replyToMaxConcurrentConsumers = replyToMaxConcurrentConsumers; } + public int getReplyToOnTimeoutMaxConcurrentConsumers() { + return replyToOnTimeoutMaxConcurrentConsumers; + } + + /** + * Specifies the maximum number of concurrent consumers for continue routing when timeout occurred when using request/reply over JMS. + */ + public void setReplyToOnTimeoutMaxConcurrentConsumers(int replyToOnTimeoutMaxConcurrentConsumers) { + this.replyToOnTimeoutMaxConcurrentConsumers = replyToOnTimeoutMaxConcurrentConsumers; + } + public boolean isExplicitQosEnabled() { return explicitQosEnabled != null ? explicitQosEnabled : false; } http://git-wip-us.apache.org/repos/asf/camel/blob/47fec206/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index dface05..92b8301 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -604,6 +604,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy } @ManagedAttribute + public int getReplyToOnTimeoutMaxConcurrentConsumers() { + return getConfiguration().getReplyToOnTimeoutMaxConcurrentConsumers(); + } + + @ManagedAttribute public int getMaxMessagesPerTask() { return getConfiguration().getMaxMessagesPerTask(); } http://git-wip-us.apache.org/repos/asf/camel/blob/47fec206/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java index 94a974a..251e55f 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.jms; +import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -542,8 +543,18 @@ public class JmsProducer extends DefaultAsyncProducer { replyManager.setEndpoint(getEndpoint()); String name = "JmsReplyManagerTimeoutChecker[" + getEndpoint().getEndpointConfiguredDestinationName() + "]"; - ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); - replyManager.setScheduledExecutorService(replyManagerExecutorService); + ScheduledExecutorService replyManagerScheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); + replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService); + + name = "JmsReplyManagerOnTimeout[" + getEndpoint().getEndpointConfiguredDestinationName() + "]"; + // allow the timeout thread to timeout so during normal operation we do not have a idle thread + int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers(); + if (max <= 0) { + throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1"); + } + ExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max); + replyManager.setOnTimeoutExecutorService(replyManagerExecutorService); + ServiceHelper.startService(replyManager); return replyManager; @@ -555,8 +566,18 @@ public class JmsProducer extends DefaultAsyncProducer { replyManager.setEndpoint(getEndpoint()); String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]"; - ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); - replyManager.setScheduledExecutorService(replyManagerExecutorService); + ScheduledExecutorService replyManagerScheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); + replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService); + + name = "JmsReplyManagerOnTimeout[" + replyTo + "]"; + // allow the timeout thread to timeout so during normal operation we do not have a idle thread + int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers(); + if (max <= 0) { + throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1"); + } + ExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max); + replyManager.setOnTimeoutExecutorService(replyManagerExecutorService); + ServiceHelper.startService(replyManager); return replyManager; http://git-wip-us.apache.org/repos/asf/camel/blob/47fec206/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java index e0e6731..d4913ef 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.jms.reply; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.support.DefaultTimeoutMap; @@ -30,9 +31,11 @@ import org.apache.camel.support.DefaultTimeoutMap; public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> { private CorrelationListener listener; + private ExecutorService executorService; - public CorrelationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { + public CorrelationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, ExecutorService executorService) { super(executor, requestMapPollTimeMillis); + this.executorService = executorService; } public void setListener(CorrelationListener listener) { @@ -49,12 +52,23 @@ public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandle // ignore } - // trigger timeout - try { - value.onTimeout(key); - } catch (Throwable e) { - // must ignore so we ensure we evict the element - log.warn("Error processing onTimeout for correlationID: " + key + " due: " + e.getMessage() + ". This exception is ignored.", e); + final Runnable task = new Runnable() { + @Override + public void run() { + // trigger timeout + try { + value.onTimeout(key); + } catch (Throwable e) { + // must ignore so we ensure we evict the element + log.warn("Error processing onTimeout for correlationID: " + key + " due: " + e.getMessage() + ". This exception is ignored.", e); + } + } + }; + if (executorService != null) { + executorService.submit(task); + } else { + // run task synchronously + task.run(); } // return true to remove the element http://git-wip-us.apache.org/repos/asf/camel/blob/47fec206/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java index e3b65aa..f5d3beb 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.jms.reply; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import javax.jms.Destination; import javax.jms.JMSException; @@ -47,11 +48,17 @@ public interface ReplyManager extends SessionAwareMessageListener { void setReplyTo(Destination replyTo); /** - * Sets the scheduled to use when checking for timeouts (no reply received within a given time period) + * Sets the scheduled thread pool to use when checking for timeouts (no reply received within a given time period) */ void setScheduledExecutorService(ScheduledExecutorService executorService); /** + * Sets the thread pool to use for continue routing {@link Exchange} when a timeout was triggered + * when doing request/reply over JMS. + */ + void setOnTimeoutExecutorService(ExecutorService executorService); + + /** * Gets the reply to queue being used */ Destination getReplyTo(); http://git-wip-us.apache.org/repos/asf/camel/blob/47fec206/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index 759ed3c..a94b7ec 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -17,6 +17,7 @@ package org.apache.camel.component.jms.reply; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -50,7 +51,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl protected final Logger log = LoggerFactory.getLogger(getClass()); protected final CamelContext camelContext; - protected ScheduledExecutorService executorService; + protected ScheduledExecutorService scheduledExecutorService; + protected ExecutorService executorService; protected JmsEndpoint endpoint; protected Destination replyTo; protected AbstractMessageListenerContainer listenerContainer; @@ -63,6 +65,10 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } public void setScheduledExecutorService(ScheduledExecutorService executorService) { + this.scheduledExecutorService = executorService; + } + + public void setOnTimeoutExecutorService(ExecutorService executorService) { this.executorService = executorService; } @@ -245,12 +251,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl @Override protected void doStart() throws Exception { ObjectHelper.notNull(executorService, "executorService", this); + ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService", this); ObjectHelper.notNull(endpoint, "endpoint", this); // timeout map to use for purging messages which have timed out, while waiting for an expected reply // when doing request/reply over JMS log.trace("Using timeout checker interval with {} millis", endpoint.getRequestTimeoutCheckerInterval()); - correlation = new CorrelationTimeoutMap(executorService, endpoint.getRequestTimeoutCheckerInterval()); + correlation = new CorrelationTimeoutMap(scheduledExecutorService, endpoint.getRequestTimeoutCheckerInterval(), executorService); ServiceHelper.startService(correlation); // create JMS listener and start it @@ -278,6 +285,10 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } // must also stop executor service + if (scheduledExecutorService != null) { + camelContext.getExecutorServiceManager().shutdownGraceful(scheduledExecutorService); + scheduledExecutorService = null; + } if (executorService != null) { camelContext.getExecutorServiceManager().shutdownGraceful(executorService); executorService = null;