CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.
Conflicts: components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bbab8282 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bbab8282 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bbab8282 Branch: refs/heads/camel-2.11.x Commit: bbab8282cd74cfc04195aa500d621871b1e35232 Parents: 8845baf Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Aug 31 11:29:25 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Aug 31 11:50:09 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/jms/JmsConsumer.java | 11 +- .../apache/camel/component/jms/JmsEndpoint.java | 37 ++++-- .../jms/reply/ReplyManagerSupport.java | 12 +- .../jms/TwoConsumerOnSameQueueTest.java | 115 +++++++++++++++++++ 4 files changed, 162 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java index 50b7833..7bf6ab5 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java @@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService { if (listenerContainer == null) { createMessageListenerContainer(); } - + getEndpoint().onListenerContainerStarting(listenerContainer); + if (getEndpoint().getConfiguration().isAsyncStartListener()) { getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() { @Override @@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService { protected void stopAndDestroyListenerContainer() { if (listenerContainer != null) { - listenerContainer.stop(); - listenerContainer.destroy(); + try { + listenerContainer.stop(); + listenerContainer.destroy(); + } finally { + getEndpoint().onListenerConstainerStopped(listenerContainer); + } } // null container and listener so they are fully re created if this consumer is restarted // then we will use updated configuration from jms endpoint that may have been managed using JMX http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index 45828f4..5f5570c 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -18,7 +18,7 @@ package org.apache.camel.component.jms; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.ExceptionListener; @@ -67,6 +67,7 @@ import org.springframework.util.ErrorHandler; @ManagedResource(description = "Managed JMS Endpoint") public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service { protected final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicInteger runningMessageListeners = new AtomicInteger(); private HeaderFilterStrategy headerFilterStrategy; private boolean pubSubDomain; private JmsBinding binding; @@ -74,7 +75,6 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy private Destination destination; private String selector; private JmsConfiguration configuration; - private final AtomicBoolean running = new AtomicBoolean(); public JmsEndpoint() { this(null, null); @@ -434,21 +434,39 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy return getComponent().getAsyncStartStopExecutorService(); } + public void onListenerContainerStarting(AbstractMessageListenerContainer container) { + runningMessageListeners.incrementAndGet(); + } + + public void onListenerConstainerStopped(AbstractMessageListenerContainer container) { + runningMessageListeners.decrementAndGet(); + } + /** * State whether this endpoint is running (eg started) */ protected boolean isRunning() { - return running.get(); + return isStarted(); } @Override - protected void doStart() throws Exception { - running.set(true); + public void stop() throws Exception { + int running = runningMessageListeners.get(); + if (running <= 0) { + super.stop(); + } else { + log.trace("There are still {} running message listeners. Cannot stop endpoint {}", running, this); + } } @Override - protected void doStop() throws Exception { - running.set(false); + public void shutdown() throws Exception { + int running = runningMessageListeners.get(); + if (running <= 0) { + super.shutdown(); + } else { + log.trace("There are still {} running message listeners. Cannot shutdown endpoint {}", running, this); + } } // Delegated properties from the configuration @@ -1138,6 +1156,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy return status.name(); } + @ManagedAttribute(description = "Number of running message listeners") + public int getRunningMessageListeners() { + return runningMessageListeners.get(); + } + // Implementation methods //------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index 3828926..173d9c8 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl listenerContainer = createListenerContainer(); listenerContainer.afterPropertiesSet(); log.debug("Starting reply listener container on endpoint: {}", endpoint); + + endpoint.onListenerContainerStarting(listenerContainer); listenerContainer.start(); } @@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl if (listenerContainer != null) { log.debug("Stopping reply listener container on endpoint: {}", endpoint); - listenerContainer.stop(); - listenerContainer.destroy(); - listenerContainer = null; + try { + listenerContainer.stop(); + listenerContainer.destroy(); + } finally { + endpoint.onListenerConstainerStopped(listenerContainer); + listenerContainer = null; + } } // must also stop executor service http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java new file mode 100644 index 0000000..32ce78c --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +public class TwoConsumerOnSameQueueTest extends CamelTestSupport { + + @Test + public void testTwoConsumerOnSameQueue() throws Exception { + sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert(); + } + + @Test + public void testStopAndStartOneRoute() throws Exception { + sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert(); + + // now stop route A + context.stopRoute("a"); + + // send new message should go to B only + resetMocks(); + + getMockEndpoint("mock:a").expectedMessageCount(0); + getMockEndpoint("mock:b").expectedBodiesReceived("Bye World", "Bye World"); + + template.sendBody("activemq:queue:foo", "Bye World"); + template.sendBody("activemq:queue:foo", "Bye World"); + + assertMockEndpointsSatisfied(); + + // now start route A + context.startRoute("a"); + + // send new message should go to both A and B + resetMocks(); + + sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert(); + } + + @Test + public void testRemoveOneRoute() throws Exception { + sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert(); + + // now stop and remove route A + context.stopRoute("a"); + assertTrue(context.removeRoute("a")); + + // send new message should go to B only + resetMocks(); + + getMockEndpoint("mock:a").expectedMessageCount(0); + getMockEndpoint("mock:b").expectedBodiesReceived("Bye World", "Bye World"); + + template.sendBody("activemq:queue:foo", "Bye World"); + template.sendBody("activemq:queue:foo", "Bye World"); + + assertMockEndpointsSatisfied(); + } + + private void sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert() throws InterruptedException { + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("Hello World"); + + template.sendBody("activemq:queue:foo", "Hello World"); + template.sendBody("activemq:queue:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); //CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("activemq:queue:foo").routeId("a") + .to("mock:a"); + + from("activemq:queue:foo").routeId("b") + .to("mock:b"); + } + }; + } +}