CAMEL-9900 - provide option for MessageListenerContainer for reply managers to 
stop quicker when CamelContext is stopping


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c17c01a1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c17c01a1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c17c01a1

Branch: refs/heads/camel-2.17.x
Commit: c17c01a10d7bca46485f3792ffb3a0070461560c
Parents: e8bff3a
Author: Jonathan Anstey <jans...@gmail.com>
Authored: Thu Apr 21 19:21:48 2016 -0230
Committer: Jonathan Anstey <jans...@gmail.com>
Committed: Thu Apr 21 19:32:06 2016 -0230

----------------------------------------------------------------------
 .../camel/component/jms/JmsComponent.java       | 12 ++++++++++-
 .../camel/component/jms/JmsConfiguration.java   | 22 +++++++++++++++++++-
 .../apache/camel/component/jms/JmsEndpoint.java | 10 +++++++++
 .../ExclusiveQueueMessageListenerContainer.java |  3 +--
 .../SharedQueueMessageListenerContainer.java    |  5 ++---
 .../jms/reply/TemporaryQueueReplyManager.java   |  3 +--
 .../camel/component/jms/JmsComponentTest.java   |  2 ++
 .../jms/JmsEndpointConfigurationTest.java       |  4 ++++
 8 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
index 46ef75d..4ff90e9 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
@@ -172,7 +172,17 @@ public class JmsComponent extends UriEndpointComponent 
implements ApplicationCon
      * may be moved at a dead letter queue on the JMS broker. To avoid this 
its recommended to enable this option.
      */
     public void setAcceptMessagesWhileStopping(boolean 
acceptMessagesWhileStopping) {
-        
getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
+        
getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); 
  
+    }
+    
+    /**
+     * Whether the DefaultMessageListenerContainer used in the reply managers 
for request-reply messaging allow 
+     * the DefaultMessageListenerContainer.runningAllowed flag to quick stop 
in case JmsConfiguration#isAcceptMessagesWhileStopping
+     * is enabled, and org.apache.camel.CamelContext is currently being 
stopped. This quick stop ability is enabled by
+     * default in the regular JMS consumers but to enable for reply managers 
you must enable this flag.
+      */
+    public void setAllowReplyManagerQuickStop(boolean 
allowReplyManagerQuickStop) {
+        
getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index c9a7117..a4c57e2 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -98,12 +98,18 @@ public class JmsConfiguration implements Cloneable {
             description = "Specifies whether the consumer container should 
auto-startup.")
     private boolean autoStartup = true;
     @UriParam(label = "consumer,advanced",
+            description = "Whether the DefaultMessageListenerContainer used in 
the reply managers for request-reply messaging allow "
+                    + " the DefaultMessageListenerContainer.runningAllowed 
flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping"
+                    + " is enabled, and org.apache.camel.CamelContext is 
currently being stopped. This quick stop ability is enabled by"
+                    + " default in the regular JMS consumers but to enable for 
reply managers you must enable this flag.")
+    private boolean allowReplyManagerQuickStop;   
+    @UriParam(label = "consumer,advanced",
             description = "Specifies whether the consumer accept messages 
while it is stopping."
                     + " You may consider enabling this option, if you start 
and stop JMS routes at runtime, while there are still messages"
                     + " enqued on the queue. If this option is false, and you 
stop the JMS route, then messages may be rejected,"
                     + " and the JMS broker would have to attempt redeliveries, 
which yet again may be rejected, and eventually the message"
                     + " may be moved at a dead letter queue on the JMS broker. 
To avoid this its recommended to enable this option.")
-    private boolean acceptMessagesWhileStopping;
+    private boolean acceptMessagesWhileStopping;    
     @UriParam(description = "Sets the JMS client ID to use. Note that this 
value, if specified, must be unique and can only be used by a single JMS 
connection instance."
             + " It is typically only required for durable topic subscriptions."
             + " If using Apache ActiveMQ you may prefer to use Virtual Topics 
instead.")
@@ -762,6 +768,20 @@ public class JmsConfiguration implements Cloneable {
         this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
     }
 
+    /**
+     * Whether the {@link DefaultMessageListenerContainer} used in the reply 
managers for request-reply messaging allow 
+     * the {@link DefaultMessageListenerContainer.runningAllowed} flag to 
quick stop in case {@link JmsConfiguration#isAcceptMessagesWhileStopping()} 
+     * is enabled, and {@link org.apache.camel.CamelContext} is currently 
being stopped. This quick stop ability is enabled by
+     * default in the regular JMS consumers but to enable for reply managers 
you must enable this flag.
+     */
+    public boolean isAllowReplyManagerQuickStop() {
+        return allowReplyManagerQuickStop;
+    }
+
+    public void setAllowReplyManagerQuickStop(boolean 
allowReplyManagerQuickStop) {
+        this.allowReplyManagerQuickStop = allowReplyManagerQuickStop;
+    }
+    
     public String getClientId() {
         return clientId;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index b1c0038..dface05 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -694,6 +694,11 @@ public class JmsEndpoint extends DefaultEndpoint 
implements HeaderFilterStrategy
     }
 
     @ManagedAttribute
+    public boolean isAllowReplyManagerQuickStop() {
+        return getConfiguration().isAllowReplyManagerQuickStop();
+    }
+    
+    @ManagedAttribute
     public boolean isAlwaysCopyMessage() {
         return getConfiguration().isAlwaysCopyMessage();
     }
@@ -791,6 +796,11 @@ public class JmsEndpoint extends DefaultEndpoint 
implements HeaderFilterStrategy
     }
 
     @ManagedAttribute
+    public void setAllowReplyManagerQuickStop(boolean 
allowReplyManagerQuickStop) {
+        
getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop);
+    }
+    
+    @ManagedAttribute
     public void setAcknowledgementMode(int consumerAcknowledgementMode) {
         getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
index 64b7bfe..9ca4cbc 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
@@ -37,7 +37,6 @@ public class ExclusiveQueueMessageListenerContainer extends 
DefaultJmsMessageLis
     // no need to override any methods currently
 
     public ExclusiveQueueMessageListenerContainer(JmsEndpoint endpoint) {
-        // request-reply listener container should not allow quick-stop so we 
can keep listening for reply messages
-        super(endpoint, false);
+        super(endpoint, endpoint.isAllowReplyManagerQuickStop());
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
index 3b682e0..dd5954b 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
@@ -48,8 +48,7 @@ public class SharedQueueMessageListenerContainer extends 
DefaultJmsMessageListen
      * @param fixedMessageSelector the fixed selector
      */
     public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, String 
fixedMessageSelector) {
-        // request-reply listener container should not allow quick-stop so we 
can keep listening for reply messages
-        super(endpoint, false);
+        super(endpoint, endpoint.isAllowReplyManagerQuickStop());
         this.fixedMessageSelector = fixedMessageSelector;
     }
 
@@ -60,7 +59,7 @@ public class SharedQueueMessageListenerContainer extends 
DefaultJmsMessageListen
      * @param creator the create to create the dynamic selector
      */
     public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, 
MessageSelectorCreator creator) {
-        super(endpoint, false);
+        super(endpoint, endpoint.isAllowReplyManagerQuickStop());
         this.creator = creator;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index 6d1f51d..42423a3 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -96,8 +96,7 @@ public class TemporaryQueueReplyManager extends 
ReplyManagerSupport {
     @Override
     protected AbstractMessageListenerContainer createListenerContainer() 
throws Exception {
         // Use DefaultMessageListenerContainer as it supports reconnects (see 
CAMEL-3193)
-        // request-reply listener container should not allow quick-stop so we 
can keep listening for reply messages
-        DefaultMessageListenerContainer answer = new 
DefaultJmsMessageListenerContainer(endpoint, false);
+        DefaultMessageListenerContainer answer = new 
DefaultJmsMessageListenerContainer(endpoint, 
endpoint.isAllowReplyManagerQuickStop());
 
         answer.setDestinationName("temporary");
         answer.setDestinationResolver(destResolver);

http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
index cf7b8a9..cded429 100644
--- 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
@@ -38,6 +38,7 @@ public class JmsComponentTest extends CamelTestSupport {
         assertEquals("Bye World", reply);
 
         assertEquals(true, endpoint.isAcceptMessagesWhileStopping());
+        assertEquals(true, endpoint.isAllowReplyManagerQuickStop());
         assertEquals(true, endpoint.isAlwaysCopyMessage());
         assertEquals(1, endpoint.getAcknowledgementMode());
         assertEquals(true, endpoint.isAutoStartup());
@@ -65,6 +66,7 @@ public class JmsComponentTest extends CamelTestSupport {
         JmsComponent comp = jmsComponentAutoAcknowledge(connectionFactory);
 
         comp.setAcceptMessagesWhileStopping(true);
+        comp.setAllowReplyManagerQuickStop(true);
         comp.setAlwaysCopyMessage(true);
         comp.setAcknowledgementMode(1);
         comp.setAutoStartup(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
index b504e62..2b74366 100644
--- 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
@@ -320,6 +320,7 @@ public class JmsEndpointConfigurationTest extends 
CamelTestSupport {
         assertEquals("Foo", endpoint.getEndpointConfiguredDestinationName());
 
         assertFalse(endpoint.isAcceptMessagesWhileStopping());
+        assertFalse(endpoint.isAllowReplyManagerQuickStop());
         assertFalse(endpoint.isAlwaysCopyMessage());
         assertTrue(endpoint.isAllowNullBody());
         assertFalse(endpoint.isAsyncConsumer());
@@ -362,6 +363,9 @@ public class JmsEndpointConfigurationTest extends 
CamelTestSupport {
 
         endpoint.setAcceptMessagesWhileStopping(true);
         assertTrue(endpoint.isAcceptMessagesWhileStopping());
+        
+        endpoint.setAllowReplyManagerQuickStop(true);
+        assertTrue(endpoint.isAllowReplyManagerQuickStop());
 
         endpoint.setAcknowledgementMode(2);
         assertEquals(2, endpoint.getAcknowledgementMode());

Reply via email to