CAMEL-7667: camel-jms route consumers will not accept new messages during stopping of CamelContext. This prevents issues with JMS connections being attempted to re-connect due failover protocol and during shutdown of the Camel app. This can lead to WARNs and ERRORs in the logs which we can avoid a bit more with letting the jms consumer shutdown quciker.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3d8ae863 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3d8ae863 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3d8ae863 Branch: refs/heads/master Commit: 3d8ae86338598d279ae2e5bf58b2676d5237b3fc Parents: 5707862 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Aug 7 15:57:41 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Aug 7 15:57:41 2014 +0200 ---------------------------------------------------------------------- .../jms/DefaultJmsMessageListenerContainer.java | 37 ++++++++++++++++++-- .../ExclusiveQueueMessageListenerContainer.java | 3 +- .../SharedQueueMessageListenerContainer.java | 5 +-- .../jms/reply/TemporaryQueueReplyManager.java | 3 +- 4 files changed, 42 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java index 793bb75..ba3282e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java @@ -35,15 +35,48 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerContainer { private final JmsEndpoint endpoint; + private final boolean allowQuickStop; public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint) { + this(endpoint, true); + } + + public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint, boolean allowQuickStop) { this.endpoint = endpoint; + this.allowQuickStop = allowQuickStop; + } + + /** + * Whether this {@link DefaultMessageListenerContainer} allows the {@link #runningAllowed()} to quick stop + * in case {@link JmsConfiguration#isAcceptMessagesWhileStopping()} is enabled, and {@link org.apache.camel.CamelContext} + * is currently being stopped. + */ + protected boolean isAllowQuickStop() { + return allowQuickStop; } @Override protected boolean runningAllowed() { - // do not run if we have been stopped - return endpoint.isRunning(); + // we can stop quickly if CamelContext is being stopped, and we do not accept messages while stopping + // this allows a more cleanly shutdown of the message listener + boolean quickStop = false; + if (isAllowQuickStop() && !endpoint.isAcceptMessagesWhileStopping()) { + quickStop = endpoint.getCamelContext().getStatus().isStopping(); + } + + if (quickStop) { + // log at debug level so its quicker to see we are stopping quicker from the logs + logger.debug("runningAllowed() -> false due CamelContext is stopping and endpoint configured to not accept messages while stopping"); + return false; + } else { + // otherwise we only run if the endpoint is running + boolean answer = endpoint.isRunning(); + // log at trace level as otherwise this can be noisy during normal operation + if (logger.isTraceEnabled()) { + logger.trace("runningAllowed() -> " + answer); + } + return answer; + } } /** http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java index b572541..64b7bfe 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java @@ -37,6 +37,7 @@ public class ExclusiveQueueMessageListenerContainer extends DefaultJmsMessageLis // no need to override any methods currently public ExclusiveQueueMessageListenerContainer(JmsEndpoint endpoint) { - super(endpoint); + // request-reply listener container should not allow quick-stop so we can keep listening for reply messages + super(endpoint, false); } } http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java index f6464ff..3b682e0 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java @@ -48,7 +48,8 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen * @param fixedMessageSelector the fixed selector */ public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, String fixedMessageSelector) { - super(endpoint); + // request-reply listener container should not allow quick-stop so we can keep listening for reply messages + super(endpoint, false); this.fixedMessageSelector = fixedMessageSelector; } @@ -59,7 +60,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen * @param creator the create to create the dynamic selector */ public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, MessageSelectorCreator creator) { - super(endpoint); + super(endpoint, false); this.creator = creator; } http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index a5e8798..f7430eb 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -99,7 +99,8 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { @Override protected AbstractMessageListenerContainer createListenerContainer() throws Exception { // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193) - DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint); + // request-reply listener container should not allow quick-stop so we can keep listening for reply messages + DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint, false); answer.setDestinationName("temporary"); answer.setDestinationResolver(destResolver);