Repository: camel
Updated Branches:
  refs/heads/master c1a49b67b -> f5e8f33fb


CAMEL-7604: Producers from static EIPs is now also enlisted in JMX during 
starting routes.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/439b0872
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/439b0872
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/439b0872

Branch: refs/heads/master
Commit: 439b0872e23598a9a929b31b6be0a763795f6611
Parents: c1a49b6
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Jul 20 12:25:17 2014 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Jul 21 08:06:01 2014 +0200

----------------------------------------------------------------------
 .../apache/camel/impl/DefaultCamelContext.java  | 62 +++++++++-------
 .../apache/camel/impl/EmptyProducerCache.java   | 13 +++-
 .../org/apache/camel/impl/ProducerCache.java    | 20 ++++-
 .../DefaultManagementLifecycleStrategy.java     |  4 +-
 .../JmxInstrumentationUsingDefaultsTest.java    |  2 +-
 .../camel/management/ManagedProducerTest.java   | 77 ++++++++++++++++++++
 .../management/ManagedRouteAddRemoveTest.java   | 45 +++++++++++-
 7 files changed, 184 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/439b0872/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 4880a82..d341375 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -1993,40 +1993,48 @@ public class DefaultCamelContext extends ServiceSupport 
implements ModelCamelCon
      */
     protected void doStartOrResumeRoutes(Map<String, RouteService> 
routeServices, boolean checkClash,
                                          boolean startConsumer, boolean 
resumeConsumer, boolean addingRoutes) throws Exception {
-        // filter out already started routes
-        Map<String, RouteService> filtered = new LinkedHashMap<String, 
RouteService>();
-        for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) 
{
-            boolean startable = false;
+        isStartingRoutes.set(true);
+        try {
+            // filter out already started routes
+            Map<String, RouteService> filtered = new LinkedHashMap<String, 
RouteService>();
+            for (Map.Entry<String, RouteService> entry : 
routeServices.entrySet()) {
+                boolean startable = false;
+
+                Consumer consumer = 
entry.getValue().getRoutes().iterator().next().getConsumer();
+                if (consumer instanceof SuspendableService) {
+                    // consumer could be suspended, which is not reflected in 
the RouteService status
+                    startable = ((SuspendableService) consumer).isSuspended();
+                }
 
-            Consumer consumer = 
entry.getValue().getRoutes().iterator().next().getConsumer();
-            if (consumer instanceof SuspendableService) {
-                // consumer could be suspended, which is not reflected in the 
RouteService status
-                startable = ((SuspendableService) consumer).isSuspended();
-            }
+                if (!startable && consumer instanceof StatefulService) {
+                    // consumer could be stopped, which is not reflected in 
the RouteService status
+                    startable = ((StatefulService) 
consumer).getStatus().isStartable();
+                } else if (!startable) {
+                    // no consumer so use state from route service
+                    startable = entry.getValue().getStatus().isStartable();
+                }
 
-            if (!startable && consumer instanceof StatefulService) {
-                // consumer could be stopped, which is not reflected in the 
RouteService status
-                startable = ((StatefulService) 
consumer).getStatus().isStartable();
-            } else if (!startable) {
-                // no consumer so use state from route service
-                startable = entry.getValue().getStatus().isStartable();
+                if (startable) {
+                    filtered.put(entry.getKey(), entry.getValue());
+                }
             }
 
-            if (startable) {
-                filtered.put(entry.getKey(), entry.getValue());
+            if (!filtered.isEmpty()) {
+                // the context is now considered started (i.e. isStarted() == 
true))
+                // starting routes is done after, not during context startup
+                safelyStartRouteServices(checkClash, startConsumer, 
resumeConsumer, addingRoutes, filtered.values());
             }
-        }
 
-        if (!filtered.isEmpty()) {
-            // the context is now considered started (i.e. isStarted() == 
true))
-            // starting routes is done after, not during context startup
-            safelyStartRouteServices(checkClash, startConsumer, 
resumeConsumer, addingRoutes, filtered.values());
-        }
+            // we are finished starting routes, so remove flag before we emit 
the startup listeners below
+            isStartingRoutes.remove();
 
-        // now notify any startup aware listeners as all the routes etc has 
been started,
-        // allowing the listeners to do custom work after routes has been 
started
-        for (StartupListener startup : startupListeners) {
-            startup.onCamelContextStarted(this, isStarted());
+            // now notify any startup aware listeners as all the routes etc 
has been started,
+            // allowing the listeners to do custom work after routes has been 
started
+            for (StartupListener startup : startupListeners) {
+                startup.onCamelContextStarted(this, isStarted());
+            }
+        } finally {
+            isStartingRoutes.remove();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/439b0872/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java 
b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
index 823bf35..56a7582 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
@@ -34,11 +34,18 @@ public class EmptyProducerCache extends ProducerCache {
     @Override
     public Producer acquireProducer(Endpoint endpoint) {
         // always create a new producer
-        Producer answer = null;
+        Producer answer;
         try {
             answer = endpoint.createProducer();
-            // must then start service so producer is ready to be used
-            ServiceHelper.startService(answer);
+            if (getCamelContext().isStartingRoutes() && answer.isSingleton()) {
+                // if we are currently starting a route, then add as service 
and enlist in JMX
+                // - but do not enlist non-singletons in JMX
+                // - note addService will also start the service
+                getCamelContext().addService(answer);
+            } else {
+                // must then start service so producer is ready to be used
+                ServiceHelper.startService(answer);
+            }
         } catch (Exception e) {
             throw new FailedToCreateProducerException(endpoint, e);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/439b0872/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index dd3b0a0..95347af 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -403,8 +403,15 @@ public class ProducerCache extends ServiceSupport {
             // create a new producer
             try {
                 answer = endpoint.createProducer();
-                // must then start service so producer is ready to be used
-                ServiceHelper.startService(answer);
+                if (getCamelContext().isStartingRoutes() && 
answer.isSingleton()) {
+                    // if we are currently starting a route, then add as 
service and enlist in JMX
+                    // - but do not enlist non-singletons in JMX
+                    // - note addService will also start the service
+                    getCamelContext().addService(answer);
+                } else {
+                    // must then start service so producer is ready to be used
+                    ServiceHelper.startService(answer);
+                }
             } catch (Exception e) {
                 throw new FailedToCreateProducerException(endpoint, e);
             }
@@ -430,7 +437,14 @@ public class ProducerCache extends ServiceSupport {
     protected void doStop() throws Exception {
         // when stopping we intend to shutdown
         ServiceHelper.stopAndShutdownService(pool);
-        ServiceHelper.stopAndShutdownServices(producers.values());
+        try {
+            ServiceHelper.stopAndShutdownServices(producers.values());
+        } finally {
+            // ensure producers are removed, and also from JMX
+            for (Producer producer : producers.values()) {
+                getCamelContext().removeService(producer);
+            }
+        }
         producers.clear();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/439b0872/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 38d8b65..223a987 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -807,7 +807,7 @@ public class DefaultManagementLifecycleStrategy extends 
ServiceSupport implement
      */
     protected void manageObject(Object me) throws Exception {
         getManagementStrategy().manageObject(me);
-        if (timerListenerManager != null && me instanceof TimerListener) {
+        if (me instanceof TimerListener) {
             TimerListener timer = (TimerListener) me;
             timerListenerManager.addTimerListener(timer);
         }
@@ -820,7 +820,7 @@ public class DefaultManagementLifecycleStrategy extends 
ServiceSupport implement
      * @throws Exception is thrown if error unregistering the managed object
      */
     protected void unmanageObject(Object me) throws Exception {
-        if (timerListenerManager != null && me instanceof TimerListener) {
+        if (me instanceof TimerListener) {
             TimerListener timer = (TimerListener) me;
             timerListenerManager.removeTimerListener(timer);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/439b0872/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
index 7f426d7..63a4a67 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
@@ -79,7 +79,7 @@ public class JmxInstrumentationUsingDefaultsTest extends 
ContextTestSupport {
         assertEquals("Could not find 1 consumers: " + s, 1, s.size());
 
         s = mbsc.queryNames(new ObjectName(domainName + ":type=producers,*"), 
null);
-        assertEquals("Could not find 2 producers: " + s, 0, s.size());
+        assertEquals("Could not find 1 producers: " + s, 1, s.size());
 
         s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), 
null);
         assertEquals("Could not find 1 route: " + s, 1, s.size());

http://git-wip-us.apache.org/repos/asf/camel/blob/439b0872/camel-core/src/test/java/org/apache/camel/management/ManagedProducerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerTest.java 
b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerTest.java
new file mode 100644
index 0000000..5cfcd8e
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Iterator;
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ManagedProducerTest extends ManagementTestSupport {
+
+    public void testProducer() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // fire a message to get it running
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        template.sendBody("direct:start", "Hello World");
+        assertMockEndpointsSatisfied();
+
+        MBeanServer mbeanServer = getMBeanServer();
+
+        Set<ObjectName> set = mbeanServer.queryNames(new 
ObjectName("*:type=producers,*"), null);
+        assertEquals(2, set.size());
+        Iterator<ObjectName> it = set.iterator();
+
+        for (int i = 0; i < 2; i++) {
+            ObjectName on = it.next();
+
+            boolean registered = mbeanServer.isRegistered(on);
+            assertEquals("Should be registered", true, registered);
+
+            String uri = (String) mbeanServer.getAttribute(on, "EndpointUri");
+            assertTrue(uri, uri.equals("log://foo") || 
uri.equals("mock://result"));
+
+            // should be started
+            String state = (String) mbeanServer.getAttribute(on, "State");
+            assertEquals("Should be started", ServiceStatus.Started.name(), 
state);
+
+            String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
+            assertEquals("foo", routeId);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("direct:start").routeId("foo").to("log:foo").to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/439b0872/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index a58fdbf..58c0176 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -55,12 +55,17 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         result.assertIsSatisfied();
 
         MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,*");
 
         // number of services
+        ObjectName on = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,*");
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
-        
+
+        // number of producers
+        ObjectName onP = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
+        Set<ObjectName> namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(1, namesP.size());
+
         log.info("Adding 2nd route");
 
         // add a 2nd route
@@ -81,6 +86,10 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
 
+        // but we should have one more producer
+        namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(2, namesP.size());
+
         log.info("Removing 2nd route");
 
         // now remove the 2nd route
@@ -92,6 +101,10 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
 
+        // and the 2nd producer should be removed
+        namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(1, namesP.size());
+
         log.info("Shutting down...");
     }
 
@@ -107,7 +120,12 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
-        
+
+        // number of producers
+        ObjectName onP = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
+        Set<ObjectName> namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(1, namesP.size());
+
         log.info("Adding 2nd route");
 
         // add a 2nd route
@@ -128,6 +146,10 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
 
+        // but as its recipient list which is dynamic-to we do not add a new 
producer
+        namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(1, namesP.size());
+
         log.info("Removing 2nd route");
 
         // now remove the 2nd route
@@ -139,6 +161,10 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
 
+        // and we still have the original producer
+        namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(1, namesP.size());
+
         log.info("Shutting down...");
     }
 
@@ -155,6 +181,11 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
 
+        // number of producers
+        ObjectName onP = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
+        Set<ObjectName> namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(1, namesP.size());
+
         log.info("Adding 2nd route");
 
         // add a 2nd route
@@ -175,6 +206,10 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
 
+        // but as its recipient list which is dynamic-to we do not add a new 
producer
+        namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(1, namesP.size());
+
         log.info("Removing 2nd route");
 
         // now remove the 2nd route
@@ -186,6 +221,10 @@ public class ManagedRouteAddRemoveTest extends 
ManagementTestSupport {
         names = mbeanServer.queryNames(on, null);
         assertEquals(7, names.size());
 
+        // and we still have the original producer
+        namesP = mbeanServer.queryNames(onP, null);
+        assertEquals(1, namesP.size());
+
         log.info("Shutting down...");
     }
 

Reply via email to