CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the 
endpoint if there are still active listeners when a route is stopped.

Conflicts:
        
components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
        
components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bbab8282
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bbab8282
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bbab8282

Branch: refs/heads/camel-2.11.x
Commit: bbab8282cd74cfc04195aa500d621871b1e35232
Parents: 8845baf
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Aug 31 11:29:25 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Aug 31 11:50:09 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsConsumer.java |  11 +-
 .../apache/camel/component/jms/JmsEndpoint.java |  37 ++++--
 .../jms/reply/ReplyManagerSupport.java          |  12 +-
 .../jms/TwoConsumerOnSameQueueTest.java         | 115 +++++++++++++++++++
 4 files changed, 162 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
index 50b7833..7bf6ab5 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
@@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements 
SuspendableService {
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
-        
+        getEndpoint().onListenerContainerStarting(listenerContainer);
+
         if (getEndpoint().getConfiguration().isAsyncStartListener()) {
             getEndpoint().getAsyncStartStopExecutorService().submit(new 
Runnable() {
                 @Override
@@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer 
implements SuspendableService {
 
     protected void stopAndDestroyListenerContainer() {
         if (listenerContainer != null) {
-            listenerContainer.stop();
-            listenerContainer.destroy();
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                getEndpoint().onListenerConstainerStopped(listenerContainer);
+            }
         }
         // null container and listener so they are fully re created if this 
consumer is restarted
         // then we will use updated configuration from jms endpoint that may 
have been managed using JMX

http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 45828f4..5f5570c 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -67,6 +67,7 @@ import org.springframework.util.ErrorHandler;
 @ManagedResource(description = "Managed JMS Endpoint")
 public class JmsEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicInteger runningMessageListeners = new AtomicInteger();
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
     private JmsBinding binding;
@@ -74,7 +75,6 @@ public class JmsEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategy
     private Destination destination;
     private String selector;
     private JmsConfiguration configuration;
-    private final AtomicBoolean running = new AtomicBoolean();
 
     public JmsEndpoint() {
         this(null, null);
@@ -434,21 +434,39 @@ public class JmsEndpoint extends DefaultEndpoint 
implements HeaderFilterStrategy
         return getComponent().getAsyncStartStopExecutorService();
     }
 
+    public void onListenerContainerStarting(AbstractMessageListenerContainer 
container) {
+        runningMessageListeners.incrementAndGet();
+    }
+
+    public void onListenerConstainerStopped(AbstractMessageListenerContainer 
container) {
+        runningMessageListeners.decrementAndGet();
+    }
+
     /**
      * State whether this endpoint is running (eg started)
      */
     protected boolean isRunning() {
-        return running.get();
+        return isStarted();
     }
 
     @Override
-    protected void doStart() throws Exception {
-        running.set(true);
+    public void stop() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.stop();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot 
stop endpoint {}", running, this);
+        }
     }
 
     @Override
-    protected void doStop() throws Exception {
-        running.set(false);
+    public void shutdown() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.shutdown();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot 
shutdown endpoint {}", running, this);
+        }
     }
 
     // Delegated properties from the configuration
@@ -1138,6 +1156,11 @@ public class JmsEndpoint extends DefaultEndpoint 
implements HeaderFilterStrategy
         return status.name();
     }
 
+    @ManagedAttribute(description = "Number of running message listeners")
+    public int getRunningMessageListeners() {
+        return runningMessageListeners.get();
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 3828926..173d9c8 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
         listenerContainer = createListenerContainer();
         listenerContainer.afterPropertiesSet();
         log.debug("Starting reply listener container on endpoint: {}", 
endpoint);
+
+        endpoint.onListenerContainerStarting(listenerContainer);
         listenerContainer.start();
     }
 
@@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
 
         if (listenerContainer != null) {
             log.debug("Stopping reply listener container on endpoint: {}", 
endpoint);
-            listenerContainer.stop();
-            listenerContainer.destroy();
-            listenerContainer = null;
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                endpoint.onListenerConstainerStopped(listenerContainer);
+                listenerContainer = null;
+            }
         }
 
         // must also stop executor service

http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
new file mode 100644
index 0000000..32ce78c
--- /dev/null
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+public class TwoConsumerOnSameQueueTest extends CamelTestSupport {
+
+    @Test
+    public void testTwoConsumerOnSameQueue() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+    }
+
+    @Test
+    public void testStopAndStartOneRoute() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+
+        // now stop route A
+        context.stopRoute("a");
+
+        // send new message should go to B only
+        resetMocks();
+
+        getMockEndpoint("mock:a").expectedMessageCount(0);
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World", "Bye 
World");
+
+        template.sendBody("activemq:queue:foo", "Bye World");
+        template.sendBody("activemq:queue:foo", "Bye World");
+
+        assertMockEndpointsSatisfied();
+
+        // now start route A
+        context.startRoute("a");
+
+        // send new message should go to both A and B
+        resetMocks();
+
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+    }
+
+    @Test
+    public void testRemoveOneRoute() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+
+        // now stop and remove route A
+        context.stopRoute("a");
+        assertTrue(context.removeRoute("a"));
+
+        // send new message should go to B only
+        resetMocks();
+
+        getMockEndpoint("mock:a").expectedMessageCount(0);
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World", "Bye 
World");
+
+        template.sendBody("activemq:queue:foo", "Bye World");
+        template.sendBody("activemq:queue:foo", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private void sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert() 
throws InterruptedException {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
+
+        template.sendBody("activemq:queue:foo", "Hello World");
+        template.sendBody("activemq:queue:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
//CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", 
jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo").routeId("a")
+                     .to("mock:a");
+  
+                from("activemq:queue:foo").routeId("b")
+                     .to("mock:b");
+            }
+        };
+    }
+}

Reply via email to