Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 89f6906cf -> 47fec2063
  refs/heads/master 3cc803a1a -> dd0ed95d5


CAMEL-6439: Use own thread pool for routing exchanges when timeout is triggered 
when doing request/reply over JMS to not steal the scheduler thread or cause a 
potential dead-lock.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dd0ed95d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dd0ed95d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dd0ed95d

Branch: refs/heads/master
Commit: dd0ed95d58f4bf1673836a6d50e2f4bffcf3d279
Parents: 3cc803a
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun May 29 10:29:24 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun May 29 10:39:19 2016 +0200

----------------------------------------------------------------------
 .../camel/component/jms/JmsComponent.java       |  7 +++++
 .../camel/component/jms/JmsConfiguration.java   | 14 ++++++++++
 .../apache/camel/component/jms/JmsEndpoint.java |  5 ++++
 .../apache/camel/component/jms/JmsProducer.java | 29 +++++++++++++++++---
 .../jms/reply/CorrelationTimeoutMap.java        | 28 ++++++++++++++-----
 .../camel/component/jms/reply/ReplyManager.java |  9 +++++-
 .../jms/reply/ReplyManagerSupport.java          | 15 ++++++++--
 7 files changed, 93 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dd0ed95d/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
index 8553688..9beb9a2 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
@@ -391,6 +391,13 @@ public class JmsComponent extends UriEndpointComponent 
implements ApplicationCon
     }
 
     /**
+     * Specifies the maximum number of concurrent consumers for continue 
routing when timeout occurred when using request/reply over JMS.
+     */
+    public void setReplyOnTimeoutToMaxConcurrentConsumers(int 
maxConcurrentConsumers) {
+        
getConfiguration().setReplyToOnTimeoutMaxConcurrentConsumers(maxConcurrentConsumers);
+    }
+
+    /**
      * The number of messages per task. -1 is unlimited.
      * If you use a range for concurrent consumers (eg min < max), then this 
option can be used to set
      * a value to eg 100 to control how fast the consumers will shrink when 
less work is required.

http://git-wip-us.apache.org/repos/asf/camel/blob/dd0ed95d/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index a6a8311..140982c 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -190,6 +190,9 @@ public class JmsConfiguration implements Cloneable {
             description = "Specifies the maximum number of concurrent 
consumers when using request/reply over JMS."
                     + " See also the maxMessagesPerTask option to control 
dynamic scaling up/down of threads.")
     private int replyToMaxConcurrentConsumers;
+    @UriParam(label = "producer", defaultValue = "1",
+            description = "Specifies the maximum number of concurrent 
consumers for continue routing when timeout occurred when using request/reply 
over JMS.")
+    private int replyToOnTimeoutMaxConcurrentConsumers = 1;
     // JmsTemplate only
     @UriParam(label = "producer", defaultValue = "false",
             description = "Set if the deliveryMode, priority or timeToLive 
qualities of service should be used when sending messages."
@@ -1114,6 +1117,17 @@ public class JmsConfiguration implements Cloneable {
         this.replyToMaxConcurrentConsumers = replyToMaxConcurrentConsumers;
     }
 
+    public int getReplyToOnTimeoutMaxConcurrentConsumers() {
+        return replyToOnTimeoutMaxConcurrentConsumers;
+    }
+
+    /**
+     * Specifies the maximum number of concurrent consumers for continue 
routing when timeout occurred when using request/reply over JMS.
+     */
+    public void setReplyToOnTimeoutMaxConcurrentConsumers(int 
replyToOnTimeoutMaxConcurrentConsumers) {
+        this.replyToOnTimeoutMaxConcurrentConsumers = 
replyToOnTimeoutMaxConcurrentConsumers;
+    }
+
     public boolean isExplicitQosEnabled() {
         return explicitQosEnabled != null ? explicitQosEnabled : false;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/dd0ed95d/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 6804470..cb02f0c 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -605,6 +605,11 @@ public class JmsEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Heade
     }
 
     @ManagedAttribute
+    public int getReplyToOnTimeoutMaxConcurrentConsumers() {
+        return getConfiguration().getReplyToOnTimeoutMaxConcurrentConsumers();
+    }
+
+    @ManagedAttribute
     public int getMaxMessagesPerTask() {
         return getConfiguration().getMaxMessagesPerTask();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/dd0ed95d/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index 94a974a..251e55f 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -542,8 +543,18 @@ public class JmsProducer extends DefaultAsyncProducer {
         replyManager.setEndpoint(getEndpoint());
 
         String name = "JmsReplyManagerTimeoutChecker[" + 
getEndpoint().getEndpointConfiguredDestinationName() + "]";
-        ScheduledExecutorService replyManagerExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
-        replyManager.setScheduledExecutorService(replyManagerExecutorService);
+        ScheduledExecutorService replyManagerScheduledExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+        
replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
+
+        name = "JmsReplyManagerOnTimeout[" + 
getEndpoint().getEndpointConfiguredDestinationName() + "]";
+        // allow the timeout thread to timeout so during normal operation we 
do not have a idle thread
+        int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
+        if (max <= 0) {
+            throw new IllegalArgumentException("The option 
replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
+        }
+        ExecutorService replyManagerExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager,
 name, 0, max);
+        replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
+
         ServiceHelper.startService(replyManager);
 
         return replyManager;
@@ -555,8 +566,18 @@ public class JmsProducer extends DefaultAsyncProducer {
         replyManager.setEndpoint(getEndpoint());
 
         String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]";
-        ScheduledExecutorService replyManagerExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
-        replyManager.setScheduledExecutorService(replyManagerExecutorService);
+        ScheduledExecutorService replyManagerScheduledExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+        
replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
+
+        name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
+        // allow the timeout thread to timeout so during normal operation we 
do not have a idle thread
+        int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
+        if (max <= 0) {
+            throw new IllegalArgumentException("The option 
replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
+        }
+        ExecutorService replyManagerExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager,
 name, 0, max);
+        replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
+
         ServiceHelper.startService(replyManager);
 
         return replyManager;

http://git-wip-us.apache.org/repos/asf/camel/blob/dd0ed95d/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
index e0e6731..d4913ef 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jms.reply;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.support.DefaultTimeoutMap;
@@ -30,9 +31,11 @@ import org.apache.camel.support.DefaultTimeoutMap;
 public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, 
ReplyHandler> {
 
     private CorrelationListener listener;
+    private ExecutorService executorService;
 
-    public CorrelationTimeoutMap(ScheduledExecutorService executor, long 
requestMapPollTimeMillis) {
+    public CorrelationTimeoutMap(ScheduledExecutorService executor, long 
requestMapPollTimeMillis, ExecutorService executorService) {
         super(executor, requestMapPollTimeMillis);
+        this.executorService = executorService;
     }
 
     public void setListener(CorrelationListener listener) {
@@ -49,12 +52,23 @@ public class CorrelationTimeoutMap extends 
DefaultTimeoutMap<String, ReplyHandle
             // ignore
         }
 
-        // trigger timeout
-        try {
-            value.onTimeout(key);
-        } catch (Throwable e) {
-            // must ignore so we ensure we evict the element
-            log.warn("Error processing onTimeout for correlationID: " + key + 
" due: " + e.getMessage() + ". This exception is ignored.", e);
+        final Runnable task = new Runnable() {
+            @Override
+            public void run() {
+                // trigger timeout
+                try {
+                    value.onTimeout(key);
+                } catch (Throwable e) {
+                    // must ignore so we ensure we evict the element
+                    log.warn("Error processing onTimeout for correlationID: " 
+ key + " due: " + e.getMessage() + ". This exception is ignored.", e);
+                }
+            }
+        };
+        if (executorService != null) {
+            executorService.submit(task);
+        } else {
+            // run task synchronously
+            task.run();
         }
 
         // return true to remove the element

http://git-wip-us.apache.org/repos/asf/camel/blob/dd0ed95d/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
index e3b65aa..f5d3beb 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jms.reply;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -47,11 +48,17 @@ public interface ReplyManager extends 
SessionAwareMessageListener {
     void setReplyTo(Destination replyTo);
 
     /**
-     * Sets the scheduled to use when checking for timeouts (no reply received 
within a given time period)
+     * Sets the scheduled thread pool to use when checking for timeouts (no 
reply received within a given time period)
      */
     void setScheduledExecutorService(ScheduledExecutorService executorService);
 
     /**
+     * Sets the thread pool to use for continue routing {@link Exchange} when 
a timeout was triggered
+     * when doing request/reply over JMS.
+     */
+    void setOnTimeoutExecutorService(ExecutorService executorService);
+
+    /**
      * Gets the reply to queue being used
      */
     Destination getReplyTo();

http://git-wip-us.apache.org/repos/asf/camel/blob/dd0ed95d/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 9e242e2..bb81399 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms.reply;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.jms.Destination;
@@ -49,7 +50,8 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
     protected final CamelContext camelContext;
-    protected ScheduledExecutorService executorService;
+    protected ScheduledExecutorService scheduledExecutorService;
+    protected ExecutorService executorService;
     protected JmsEndpoint endpoint;
     protected Destination replyTo;
     protected AbstractMessageListenerContainer listenerContainer;
@@ -62,6 +64,10 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
     }
 
     public void setScheduledExecutorService(ScheduledExecutorService 
executorService) {
+        this.scheduledExecutorService = executorService;
+    }
+
+    public void setOnTimeoutExecutorService(ExecutorService executorService) {
         this.executorService = executorService;
     }
 
@@ -244,12 +250,13 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
     @Override
     protected void doStart() throws Exception {
         ObjectHelper.notNull(executorService, "executorService", this);
+        ObjectHelper.notNull(scheduledExecutorService, 
"scheduledExecutorService", this);
         ObjectHelper.notNull(endpoint, "endpoint", this);
 
         // timeout map to use for purging messages which have timed out, while 
waiting for an expected reply
         // when doing request/reply over JMS
         log.trace("Using timeout checker interval with {} millis", 
endpoint.getRequestTimeoutCheckerInterval());
-        correlation = new CorrelationTimeoutMap(executorService, 
endpoint.getRequestTimeoutCheckerInterval());
+        correlation = new CorrelationTimeoutMap(scheduledExecutorService, 
endpoint.getRequestTimeoutCheckerInterval(), executorService);
         ServiceHelper.startService(correlation);
 
         // create JMS listener and start it
@@ -277,6 +284,10 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
         }
 
         // must also stop executor service
+        if (scheduledExecutorService != null) {
+            
camelContext.getExecutorServiceManager().shutdownGraceful(scheduledExecutorService);
+            scheduledExecutorService = null;
+        }
         if (executorService != null) {
             
camelContext.getExecutorServiceManager().shutdownGraceful(executorService);
             executorService = null;

Reply via email to