CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e21ec70 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e21ec70 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e21ec70 Branch: refs/heads/camel-2.12.x Commit: 2e21ec7012d3ab35ab8c344de0366436e4918bc5 Parents: 16d8180 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Aug 31 11:29:25 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Aug 31 11:45:13 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/jms/JmsConsumer.java | 11 ++++-- .../apache/camel/component/jms/JmsEndpoint.java | 37 ++++++++++++++++---- .../jms/reply/ReplyManagerSupport.java | 12 +++++-- .../jms/TwoConsumerOnSameQueueTest.java | 1 - 4 files changed, 47 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java index 50b7833..7bf6ab5 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java @@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService { if (listenerContainer == null) { createMessageListenerContainer(); } - + getEndpoint().onListenerContainerStarting(listenerContainer); + if (getEndpoint().getConfiguration().isAsyncStartListener()) { getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() { @Override @@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService { protected void stopAndDestroyListenerContainer() { if (listenerContainer != null) { - listenerContainer.stop(); - listenerContainer.destroy(); + try { + listenerContainer.stop(); + listenerContainer.destroy(); + } finally { + getEndpoint().onListenerConstainerStopped(listenerContainer); + } } // null container and listener so they are fully re created if this consumer is restarted // then we will use updated configuration from jms endpoint that may have been managed using JMX http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index 701de7c..664da7c 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -18,7 +18,7 @@ package org.apache.camel.component.jms; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.ExceptionListener; @@ -71,6 +71,7 @@ import org.springframework.util.ErrorHandler; @UriEndpoint(scheme = "jms", consumerClass = JmsConsumer.class) public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service { protected final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicInteger runningMessageListeners = new AtomicInteger(); @UriParam private HeaderFilterStrategy headerFilterStrategy; private boolean pubSubDomain; @@ -82,7 +83,6 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy private String selector; @UriParam private JmsConfiguration configuration; - private final AtomicBoolean running = new AtomicBoolean(); public JmsEndpoint() { this(null, null); @@ -442,21 +442,39 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy return getComponent().getAsyncStartStopExecutorService(); } + public void onListenerContainerStarting(AbstractMessageListenerContainer container) { + runningMessageListeners.incrementAndGet(); + } + + public void onListenerConstainerStopped(AbstractMessageListenerContainer container) { + runningMessageListeners.decrementAndGet(); + } + /** * State whether this endpoint is running (eg started) */ protected boolean isRunning() { - return running.get(); + return isStarted(); } @Override - protected void doStart() throws Exception { - running.set(true); + public void stop() throws Exception { + int running = runningMessageListeners.get(); + if (running <= 0) { + super.stop(); + } else { + log.trace("There are still {} running message listeners. Cannot stop endpoint {}", running, this); + } } @Override - protected void doStop() throws Exception { - running.set(false); + public void shutdown() throws Exception { + int running = runningMessageListeners.get(); + if (running <= 0) { + super.shutdown(); + } else { + log.trace("There are still {} running message listeners. Cannot shutdown endpoint {}", running, this); + } } // Delegated properties from the configuration @@ -1146,6 +1164,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy return status.name(); } + @ManagedAttribute(description = "Number of running message listeners") + public int getRunningMessageListeners() { + return runningMessageListeners.get(); + } + // Implementation methods //------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index 3828926..173d9c8 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl listenerContainer = createListenerContainer(); listenerContainer.afterPropertiesSet(); log.debug("Starting reply listener container on endpoint: {}", endpoint); + + endpoint.onListenerContainerStarting(listenerContainer); listenerContainer.start(); } @@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl if (listenerContainer != null) { log.debug("Stopping reply listener container on endpoint: {}", endpoint); - listenerContainer.stop(); - listenerContainer.destroy(); - listenerContainer = null; + try { + listenerContainer.stop(); + listenerContainer.destroy(); + } finally { + endpoint.onListenerConstainerStopped(listenerContainer); + listenerContainer = null; + } } // must also stop executor service http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java index 3cdfd9e..55f73db 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java @@ -62,7 +62,6 @@ public class TwoConsumerOnSameQueueTest extends CamelTestSupport { } @Test - @Ignore public void testRemoveOneRoute() throws Exception { sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();