CAMEL-9900 - provide option for MessageListenerContainer for reply managers to stop quicker when CamelContext is stopping
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c17c01a1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c17c01a1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c17c01a1 Branch: refs/heads/camel-2.17.x Commit: c17c01a10d7bca46485f3792ffb3a0070461560c Parents: e8bff3a Author: Jonathan Anstey <jans...@gmail.com> Authored: Thu Apr 21 19:21:48 2016 -0230 Committer: Jonathan Anstey <jans...@gmail.com> Committed: Thu Apr 21 19:32:06 2016 -0230 ---------------------------------------------------------------------- .../camel/component/jms/JmsComponent.java | 12 ++++++++++- .../camel/component/jms/JmsConfiguration.java | 22 +++++++++++++++++++- .../apache/camel/component/jms/JmsEndpoint.java | 10 +++++++++ .../ExclusiveQueueMessageListenerContainer.java | 3 +-- .../SharedQueueMessageListenerContainer.java | 5 ++--- .../jms/reply/TemporaryQueueReplyManager.java | 3 +-- .../camel/component/jms/JmsComponentTest.java | 2 ++ .../jms/JmsEndpointConfigurationTest.java | 4 ++++ 8 files changed, 52 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java index 46ef75d..4ff90e9 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java @@ -172,7 +172,17 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon * may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option. */ public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { - getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); + getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); + } + + /** + * Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow + * the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping + * is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by + * default in the regular JMS consumers but to enable for reply managers you must enable this flag. + */ + public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) { + getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index c9a7117..a4c57e2 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -98,12 +98,18 @@ public class JmsConfiguration implements Cloneable { description = "Specifies whether the consumer container should auto-startup.") private boolean autoStartup = true; @UriParam(label = "consumer,advanced", + description = "Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow " + + " the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping" + + " is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by" + + " default in the regular JMS consumers but to enable for reply managers you must enable this flag.") + private boolean allowReplyManagerQuickStop; + @UriParam(label = "consumer,advanced", description = "Specifies whether the consumer accept messages while it is stopping." + " You may consider enabling this option, if you start and stop JMS routes at runtime, while there are still messages" + " enqued on the queue. If this option is false, and you stop the JMS route, then messages may be rejected," + " and the JMS broker would have to attempt redeliveries, which yet again may be rejected, and eventually the message" + " may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option.") - private boolean acceptMessagesWhileStopping; + private boolean acceptMessagesWhileStopping; @UriParam(description = "Sets the JMS client ID to use. Note that this value, if specified, must be unique and can only be used by a single JMS connection instance." + " It is typically only required for durable topic subscriptions." + " If using Apache ActiveMQ you may prefer to use Virtual Topics instead.") @@ -762,6 +768,20 @@ public class JmsConfiguration implements Cloneable { this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; } + /** + * Whether the {@link DefaultMessageListenerContainer} used in the reply managers for request-reply messaging allow + * the {@link DefaultMessageListenerContainer.runningAllowed} flag to quick stop in case {@link JmsConfiguration#isAcceptMessagesWhileStopping()} + * is enabled, and {@link org.apache.camel.CamelContext} is currently being stopped. This quick stop ability is enabled by + * default in the regular JMS consumers but to enable for reply managers you must enable this flag. + */ + public boolean isAllowReplyManagerQuickStop() { + return allowReplyManagerQuickStop; + } + + public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) { + this.allowReplyManagerQuickStop = allowReplyManagerQuickStop; + } + public String getClientId() { return clientId; } http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index b1c0038..dface05 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -694,6 +694,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy } @ManagedAttribute + public boolean isAllowReplyManagerQuickStop() { + return getConfiguration().isAllowReplyManagerQuickStop(); + } + + @ManagedAttribute public boolean isAlwaysCopyMessage() { return getConfiguration().isAlwaysCopyMessage(); } @@ -791,6 +796,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy } @ManagedAttribute + public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) { + getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop); + } + + @ManagedAttribute public void setAcknowledgementMode(int consumerAcknowledgementMode) { getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode); } http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java index 64b7bfe..9ca4cbc 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java @@ -37,7 +37,6 @@ public class ExclusiveQueueMessageListenerContainer extends DefaultJmsMessageLis // no need to override any methods currently public ExclusiveQueueMessageListenerContainer(JmsEndpoint endpoint) { - // request-reply listener container should not allow quick-stop so we can keep listening for reply messages - super(endpoint, false); + super(endpoint, endpoint.isAllowReplyManagerQuickStop()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java index 3b682e0..dd5954b 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java @@ -48,8 +48,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen * @param fixedMessageSelector the fixed selector */ public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, String fixedMessageSelector) { - // request-reply listener container should not allow quick-stop so we can keep listening for reply messages - super(endpoint, false); + super(endpoint, endpoint.isAllowReplyManagerQuickStop()); this.fixedMessageSelector = fixedMessageSelector; } @@ -60,7 +59,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen * @param creator the create to create the dynamic selector */ public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, MessageSelectorCreator creator) { - super(endpoint, false); + super(endpoint, endpoint.isAllowReplyManagerQuickStop()); this.creator = creator; } http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index 6d1f51d..42423a3 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -96,8 +96,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { @Override protected AbstractMessageListenerContainer createListenerContainer() throws Exception { // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193) - // request-reply listener container should not allow quick-stop so we can keep listening for reply messages - DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint, false); + DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint, endpoint.isAllowReplyManagerQuickStop()); answer.setDestinationName("temporary"); answer.setDestinationResolver(destResolver); http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java index cf7b8a9..cded429 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java @@ -38,6 +38,7 @@ public class JmsComponentTest extends CamelTestSupport { assertEquals("Bye World", reply); assertEquals(true, endpoint.isAcceptMessagesWhileStopping()); + assertEquals(true, endpoint.isAllowReplyManagerQuickStop()); assertEquals(true, endpoint.isAlwaysCopyMessage()); assertEquals(1, endpoint.getAcknowledgementMode()); assertEquals(true, endpoint.isAutoStartup()); @@ -65,6 +66,7 @@ public class JmsComponentTest extends CamelTestSupport { JmsComponent comp = jmsComponentAutoAcknowledge(connectionFactory); comp.setAcceptMessagesWhileStopping(true); + comp.setAllowReplyManagerQuickStop(true); comp.setAlwaysCopyMessage(true); comp.setAcknowledgementMode(1); comp.setAutoStartup(true); http://git-wip-us.apache.org/repos/asf/camel/blob/c17c01a1/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java index b504e62..2b74366 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java @@ -320,6 +320,7 @@ public class JmsEndpointConfigurationTest extends CamelTestSupport { assertEquals("Foo", endpoint.getEndpointConfiguredDestinationName()); assertFalse(endpoint.isAcceptMessagesWhileStopping()); + assertFalse(endpoint.isAllowReplyManagerQuickStop()); assertFalse(endpoint.isAlwaysCopyMessage()); assertTrue(endpoint.isAllowNullBody()); assertFalse(endpoint.isAsyncConsumer()); @@ -362,6 +363,9 @@ public class JmsEndpointConfigurationTest extends CamelTestSupport { endpoint.setAcceptMessagesWhileStopping(true); assertTrue(endpoint.isAcceptMessagesWhileStopping()); + + endpoint.setAllowReplyManagerQuickStop(true); + assertTrue(endpoint.isAllowReplyManagerQuickStop()); endpoint.setAcknowledgementMode(2); assertEquals(2, endpoint.getAcknowledgementMode());