Repository: camel Updated Branches: refs/heads/camel-2.17.x e8bff3a0a -> 00cb4b89b refs/heads/master a4c43a6ca -> d403cc908
CAMEL-9900 - provide option for MessageListenerContainer for reply managers to stop quicker when CamelContext is stopping Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d403cc90 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d403cc90 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d403cc90 Branch: refs/heads/master Commit: d403cc9084123667e29398dad5a0eaa109521de3 Parents: a4c43a6 Author: Jonathan Anstey <jans...@gmail.com> Authored: Thu Apr 21 19:21:48 2016 -0230 Committer: Jonathan Anstey <jans...@gmail.com> Committed: Thu Apr 21 19:21:48 2016 -0230 ---------------------------------------------------------------------- components/camel-jms/src/main/docs/jms.adoc | 10 +++++++-- .../camel/component/jms/JmsComponent.java | 12 ++++++++++- .../camel/component/jms/JmsConfiguration.java | 22 +++++++++++++++++++- .../apache/camel/component/jms/JmsEndpoint.java | 10 +++++++++ .../ExclusiveQueueMessageListenerContainer.java | 3 +-- .../SharedQueueMessageListenerContainer.java | 5 ++--- .../jms/reply/TemporaryQueueReplyManager.java | 3 +-- .../camel/component/jms/JmsComponentTest.java | 2 ++ .../jms/JmsEndpointConfigurationTest.java | 4 ++++ 9 files changed, 60 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/docs/jms.adoc ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/docs/jms.adoc b/components/camel-jms/src/main/docs/jms.adoc index a930518..3745626 100644 --- a/components/camel-jms/src/main/docs/jms.adoc +++ b/components/camel-jms/src/main/docs/jms.adoc @@ -207,8 +207,9 @@ Component options +++++++++++++++++ + // component options: START -The JMS component supports 68 options which are listed below. +The JMS component supports 69 options which are listed below. @@ -217,6 +218,7 @@ The JMS component supports 68 options which are listed below. | Name | Java Type | Description | configuration | JmsConfiguration | To use a shared JMS configuration | acceptMessagesWhileStopping | boolean | Specifies whether the consumer accept messages while it is stopping. You may consider enabling this option if you start and stop JMS routes at runtime while there are still messages enqued on the queue. If this option is false and you stop the JMS route then messages may be rejected and the JMS broker would have to attempt redeliveries which yet again may be rejected and eventually the message may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option. +| allowReplyManagerQuickStop | boolean | Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfigurationisAcceptMessagesWhileStopping is enabled and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by default in the regular JMS consumers but to enable for reply managers you must enable this flag. | acknowledgementMode | int | The JMS acknowledgement mode defined as an Integer. Allows you to set vendor-specific extensions to the acknowledgment mode. For the regular modes it is preferable to use the acknowledgementModeName instead. | eagerLoadingOfProperties | boolean | Enables eager loading of JMS properties as soon as a message is loaded which generally is inefficient as the JMS properties may not be required but sometimes can catch early any issues with the underlying JMS provider and the use of JMS properties | acknowledgementModeName | String | The JMS acknowledgement name which is one of: SESSION_TRANSACTED CLIENT_ACKNOWLEDGE AUTO_ACKNOWLEDGE DUPS_OK_ACKNOWLEDGE @@ -287,13 +289,15 @@ The JMS component supports 68 options which are listed below. // component options: END + [[JMS-Endpointoptions]] Endpoint options ++++++++++++++++ + // endpoint options: START -The JMS component supports 75 endpoint options which are listed below: +The JMS component supports 76 endpoint options which are listed below: [width="100%",cols="2s,1,1m,1m,5",options="header"] |======================================================================= @@ -316,6 +320,7 @@ The JMS component supports 75 endpoint options which are listed below: | replyToDeliveryPersistent | consumer | true | boolean | Specifies whether to use persistent delivery by default for replies. | selector | consumer | | String | Sets the JMS selector to use | acceptMessagesWhileStopping | consumer (advanced) | false | boolean | Specifies whether the consumer accept messages while it is stopping. You may consider enabling this option if you start and stop JMS routes at runtime while there are still messages enqued on the queue. If this option is false and you stop the JMS route then messages may be rejected and the JMS broker would have to attempt redeliveries which yet again may be rejected and eventually the message may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option. +| allowReplyManagerQuickStop | consumer (advanced) | false | boolean | Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case link JmsConfigurationisAcceptMessagesWhileStopping() is enabled and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by default in the regular JMS consumers but to enable for reply managers you must enable this flag. | consumerType | consumer (advanced) | Default | ConsumerType | The consumer type to use which can be one of: Simple Default or Custom. The consumer type determines which Spring JMS listener to use. Default will use org.springframework.jms.listener.DefaultMessageListenerContainer Simple will use org.springframework.jms.listener.SimpleMessageListenerContainer. When Custom is specified the MessageListenerContainerFactory defined by the messageListenerContainerFactory option will determine what org.springframework.jms.listener.AbstractMessageListenerContainer to use. | defaultTaskExecutorType | consumer (advanced) | | DefaultTaskExecutorType | Specifies what default TaskExecutor type to use in the DefaultMessageListenerContainer for both consumer endpoints and the ReplyTo consumer of producer endpoints. Possible values: SimpleAsync (uses Spring's SimpleAsyncTaskExecutor) or ThreadPool (uses Spring's ThreadPoolTaskExecutor with optimal values - cached threadpool-like). If not set it defaults to the previous behaviour which uses a cached thread pool for consumer endpoints and SimpleAsync for reply consumers. The use of ThreadPool is recommended to reduce thread trash in elastic configurations with dynamically increasing and decreasing concurrent consumers. | eagerLoadingOfProperties | consumer (advanced) | false | boolean | Enables eager loading of JMS properties as soon as a message is loaded which generally is inefficient as the JMS properties may not be required but sometimes can catch early any issues with the underlying JMS provider and the use of JMS properties @@ -377,6 +382,7 @@ The JMS component supports 75 endpoint options which are listed below: // endpoint options: END + [[JMS-MessageMappingbetweenJMSandCamel]] Message Mapping between JMS and Camel ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java index 46ef75d..4ff90e9 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java @@ -172,7 +172,17 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon * may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option. */ public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { - getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); + getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); + } + + /** + * Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow + * the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping + * is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by + * default in the regular JMS consumers but to enable for reply managers you must enable this flag. + */ + public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) { + getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index c9a7117..a4c57e2 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -98,12 +98,18 @@ public class JmsConfiguration implements Cloneable { description = "Specifies whether the consumer container should auto-startup.") private boolean autoStartup = true; @UriParam(label = "consumer,advanced", + description = "Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow " + + " the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping" + + " is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by" + + " default in the regular JMS consumers but to enable for reply managers you must enable this flag.") + private boolean allowReplyManagerQuickStop; + @UriParam(label = "consumer,advanced", description = "Specifies whether the consumer accept messages while it is stopping." + " You may consider enabling this option, if you start and stop JMS routes at runtime, while there are still messages" + " enqued on the queue. If this option is false, and you stop the JMS route, then messages may be rejected," + " and the JMS broker would have to attempt redeliveries, which yet again may be rejected, and eventually the message" + " may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option.") - private boolean acceptMessagesWhileStopping; + private boolean acceptMessagesWhileStopping; @UriParam(description = "Sets the JMS client ID to use. Note that this value, if specified, must be unique and can only be used by a single JMS connection instance." + " It is typically only required for durable topic subscriptions." + " If using Apache ActiveMQ you may prefer to use Virtual Topics instead.") @@ -762,6 +768,20 @@ public class JmsConfiguration implements Cloneable { this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; } + /** + * Whether the {@link DefaultMessageListenerContainer} used in the reply managers for request-reply messaging allow + * the {@link DefaultMessageListenerContainer.runningAllowed} flag to quick stop in case {@link JmsConfiguration#isAcceptMessagesWhileStopping()} + * is enabled, and {@link org.apache.camel.CamelContext} is currently being stopped. This quick stop ability is enabled by + * default in the regular JMS consumers but to enable for reply managers you must enable this flag. + */ + public boolean isAllowReplyManagerQuickStop() { + return allowReplyManagerQuickStop; + } + + public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) { + this.allowReplyManagerQuickStop = allowReplyManagerQuickStop; + } + public String getClientId() { return clientId; } http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index c671e75..c6a3599 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -695,6 +695,11 @@ public class JmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Heade } @ManagedAttribute + public boolean isAllowReplyManagerQuickStop() { + return getConfiguration().isAllowReplyManagerQuickStop(); + } + + @ManagedAttribute public boolean isAlwaysCopyMessage() { return getConfiguration().isAlwaysCopyMessage(); } @@ -792,6 +797,11 @@ public class JmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Heade } @ManagedAttribute + public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) { + getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop); + } + + @ManagedAttribute public void setAcknowledgementMode(int consumerAcknowledgementMode) { getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode); } http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java index 64b7bfe..9ca4cbc 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java @@ -37,7 +37,6 @@ public class ExclusiveQueueMessageListenerContainer extends DefaultJmsMessageLis // no need to override any methods currently public ExclusiveQueueMessageListenerContainer(JmsEndpoint endpoint) { - // request-reply listener container should not allow quick-stop so we can keep listening for reply messages - super(endpoint, false); + super(endpoint, endpoint.isAllowReplyManagerQuickStop()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java index 3b682e0..dd5954b 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java @@ -48,8 +48,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen * @param fixedMessageSelector the fixed selector */ public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, String fixedMessageSelector) { - // request-reply listener container should not allow quick-stop so we can keep listening for reply messages - super(endpoint, false); + super(endpoint, endpoint.isAllowReplyManagerQuickStop()); this.fixedMessageSelector = fixedMessageSelector; } @@ -60,7 +59,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen * @param creator the create to create the dynamic selector */ public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, MessageSelectorCreator creator) { - super(endpoint, false); + super(endpoint, endpoint.isAllowReplyManagerQuickStop()); this.creator = creator; } http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index 6d1f51d..42423a3 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -96,8 +96,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { @Override protected AbstractMessageListenerContainer createListenerContainer() throws Exception { // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193) - // request-reply listener container should not allow quick-stop so we can keep listening for reply messages - DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint, false); + DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint, endpoint.isAllowReplyManagerQuickStop()); answer.setDestinationName("temporary"); answer.setDestinationResolver(destResolver); http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java index cf7b8a9..cded429 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java @@ -38,6 +38,7 @@ public class JmsComponentTest extends CamelTestSupport { assertEquals("Bye World", reply); assertEquals(true, endpoint.isAcceptMessagesWhileStopping()); + assertEquals(true, endpoint.isAllowReplyManagerQuickStop()); assertEquals(true, endpoint.isAlwaysCopyMessage()); assertEquals(1, endpoint.getAcknowledgementMode()); assertEquals(true, endpoint.isAutoStartup()); @@ -65,6 +66,7 @@ public class JmsComponentTest extends CamelTestSupport { JmsComponent comp = jmsComponentAutoAcknowledge(connectionFactory); comp.setAcceptMessagesWhileStopping(true); + comp.setAllowReplyManagerQuickStop(true); comp.setAlwaysCopyMessage(true); comp.setAcknowledgementMode(1); comp.setAutoStartup(true); http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java index b504e62..2b74366 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java @@ -320,6 +320,7 @@ public class JmsEndpointConfigurationTest extends CamelTestSupport { assertEquals("Foo", endpoint.getEndpointConfiguredDestinationName()); assertFalse(endpoint.isAcceptMessagesWhileStopping()); + assertFalse(endpoint.isAllowReplyManagerQuickStop()); assertFalse(endpoint.isAlwaysCopyMessage()); assertTrue(endpoint.isAllowNullBody()); assertFalse(endpoint.isAsyncConsumer()); @@ -362,6 +363,9 @@ public class JmsEndpointConfigurationTest extends CamelTestSupport { endpoint.setAcceptMessagesWhileStopping(true); assertTrue(endpoint.isAcceptMessagesWhileStopping()); + + endpoint.setAllowReplyManagerQuickStop(true); + assertTrue(endpoint.isAllowReplyManagerQuickStop()); endpoint.setAcknowledgementMode(2); assertEquals(2, endpoint.getAcknowledgementMode());