Author: davsclaus
Date: Fri Jan 28 17:06:59 2011
New Revision: 1064780

URL: http://svn.apache.org/viewvc?rev=1064780&view=rev
Log:
CAMEL-3600: Avoid startings/stopping services multiple times. Now Camel checks 
state and only executes if needed.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
    
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
    
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java
    
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
    
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
    
camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
    
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java 
Fri Jan 28 17:06:59 2011
@@ -42,7 +42,7 @@ import org.apache.camel.util.ObjectHelpe
  *
  * @version $Revision$
  */
-public abstract class DefaultEndpoint implements Endpoint, HasId, 
CamelContextAware {
+public abstract class DefaultEndpoint extends ServiceSupport implements 
Endpoint, HasId, CamelContextAware {
 
     //Match any key-value pair in the URI query string whose key contains 
"passphrase" or "password" (case-insensitive).
     //First capture group is the key, second is the value.
@@ -232,11 +232,13 @@ public abstract class DefaultEndpoint im
         return false;
     }
 
-    public void start() throws Exception {
+    @Override
+    protected void doStart() throws Exception {
         // noop
     }
 
-    public void stop() throws Exception {
+    @Override
+    protected void doStop() throws Exception {
         // noop
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java 
Fri Jan 28 17:06:59 2011
@@ -90,6 +90,7 @@ public abstract class DefaultRoute exten
     }
 
     protected void doStart() throws Exception {
+        // noop
     }
 
     protected void doStop() throws Exception {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java 
Fri Jan 28 17:06:59 2011
@@ -56,6 +56,7 @@ public class RouteService extends Servic
     private boolean removingRoutes;
     private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>();
     private final AtomicBoolean warmUpDone = new AtomicBoolean(false);
+    private final AtomicBoolean endpointpDone = new AtomicBoolean(false);
 
     public RouteService(DefaultCamelContext camelContext, RouteDefinition 
routeDefinition, List<RouteContext> routeContexts, List<Route> routes) {
         this.camelContext = camelContext;
@@ -105,11 +106,20 @@ public class RouteService extends Servic
     }
 
     public synchronized void warmUp() throws Exception {
+        if (endpointpDone.compareAndSet(false, true)) {
+            // endpoints should only be started once as they can be reused on 
other routes
+            // and whatnot, thus their lifecycle is to start once, and only to 
stop when Camel shutdown
+            for (Route route : routes) {
+                // ensure endpoint is started first (before the route 
services, such as the consumer)
+                ServiceHelper.startService(route.getEndpoint());
+            }
+        }
+
         if (warmUpDone.compareAndSet(false, true)) {
 
             for (Route route : routes) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Starting route services: " + route);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Starting services on route: " + route.getId());
                 }
 
                 List<Service> services = route.getServices();
@@ -172,8 +182,8 @@ public class RouteService extends Servic
         }
         
         for (Route route : routes) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Stopping route: " + route);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping services on route: " + route.getId());
             }
             // getServices will not add services again
             List<Service> services = route.getServices();
@@ -187,9 +197,9 @@ public class RouteService extends Servic
 
             // stop the route itself
             if (isShutdownCamelContext) {
-                ServiceHelper.stopAndShutdownService(route);
+                ServiceHelper.stopAndShutdownServices(route);
             } else {
-                ServiceHelper.stopService(route);
+                ServiceHelper.stopServices(route);
             }
 
             // fire event
@@ -203,6 +213,12 @@ public class RouteService extends Servic
 
     @Override
     protected void doShutdown() throws Exception {
+        for (Route route : routes) {
+            // endpoints should only be stopped when Camel is shutting down
+            // so comments in warmUp method
+            ServiceHelper.stopAndShutdownServices(route.getEndpoint());
+        }
+
         // need to call onRoutesRemove when the CamelContext is shutting down 
or Route is shutdown
         for (LifecycleStrategy strategy : 
camelContext.getLifecycleStrategies()) {
             strategy.onRoutesRemove(routes);
@@ -211,6 +227,7 @@ public class RouteService extends Servic
         // clear inputs on shutdown
         inputs.clear();
         warmUpDone.set(false);
+        endpointpDone.set(false);
     }
 
     @Override

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java
 Fri Jan 28 17:06:59 2011
@@ -80,8 +80,8 @@ public class AnnotationTypeConverterLoad
         LOG.info("Found " + packageNames.length + " packages with " + 
classes.size() + " @Converter classes to load");
 
         for (Class type : classes) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Loading converter class: " + 
ObjectHelper.name(type));
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Loading converter class: " + 
ObjectHelper.name(type));
             }
             loadConverterMethods(registry, type);
         }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
 Fri Jan 28 17:06:59 2011
@@ -121,6 +121,10 @@ public class ManagedManagementStrategy e
         } else if (managedObject instanceof ManagedService) {
             // check for managed service should be last
             ManagedService ms = (ManagedService) managedObject;
+            // skip endpoints as they are already managed
+            if (ms.getService() instanceof Endpoint) {
+                return null;
+            }
             objectName = 
getManagementNamingStrategy().getObjectNameForService(ms.getContext(), 
ms.getService());
         }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -17,6 +17,8 @@
 package org.apache.camel.management.mbean;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ManagementStrategy;
 import org.springframework.jmx.export.annotation.ManagedAttribute;
 import org.springframework.jmx.export.annotation.ManagedResource;
@@ -52,6 +54,22 @@ public class ManagedEndpoint implements 
         return endpoint.isSingleton();
     }
 
+    @ManagedAttribute(description = "Service State")
+    public String getState() {
+        // must use String type to be sure remote JMX can read the attribute 
without requiring Camel classes.
+        if (endpoint instanceof ServiceSupport) {
+            ServiceStatus status = ((ServiceSupport) endpoint).getStatus();
+            // if no status exists then its stopped
+            if (status == null) {
+                status = ServiceStatus.Stopped;
+            }
+            return status.name();
+        }
+
+        // assume started if not a ServiceSupport instance
+        return ServiceStatus.Started.name();
+    }
+
     public Object getInstance() {
         return endpoint;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
 Fri Jan 28 17:06:59 2011
@@ -141,6 +141,8 @@ public class SendProcessor extends Servi
             destination = lookup;
         }
         // warm up the producer by starting it so we can fail fast if there 
was a problem
+        // however must start endpoint first
+        ServiceHelper.startService(destination);
         producerCache.startProducer(destination);
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java 
Fri Jan 28 17:06:59 2011
@@ -73,4 +73,9 @@ public class LRUCache<K, V> extends Link
             clear();
         }
     }
+
+    @Override
+    public String toString() {
+        return "LRUCache@" + ObjectHelper.getIdentityHashCode(this);
+    }
 }
\ No newline at end of file

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java 
Fri Jan 28 17:06:59 2011
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.camel.Service;
 import org.apache.camel.ShutdownableService;
 import org.apache.camel.SuspendableService;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -44,6 +45,13 @@ public final class ServiceHelper {
      * Starts all of the given services
      */
     public static void startService(Object value) throws Exception {
+        if (isStarted(value)) {
+            // only start service if not already started
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Service already started: " + value);
+            }
+            return;
+        }
         if (value instanceof Service) {
             Service service = (Service)value;
             if (LOG.isTraceEnabled()) {
@@ -75,13 +83,7 @@ public final class ServiceHelper {
             return;
         }
         for (Object value : services) {
-            if (value instanceof Service) {
-                Service service = (Service)value;
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Starting service: " + service);
-                }
-                service.start();
-            }
+            startService(value);
         }
     }
 
@@ -100,6 +102,13 @@ public final class ServiceHelper {
      * Stops all of the given services, throwing the first exception caught
      */
     public static void stopService(Object value) throws Exception {
+        if (isStopped(value)) {
+            // only stop service if not already stopped
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Service already stopped: " + value);
+            }
+            return;
+        }
         if (value instanceof Service) {
             Service service = (Service)value;
             if (LOG.isTraceEnabled()) {
@@ -120,20 +129,14 @@ public final class ServiceHelper {
         }
         Exception firstException = null;
         for (Object value : services) {
-            if (value instanceof Service) {
-                Service service = (Service)value;
-                try {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Stopping service: " + service);
-                    }
-                    service.stop();
-                } catch (Exception e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Caught exception stopping service: " + 
service, e);
-                    }
-                    if (firstException == null) {
-                        firstException = e;
-                    }
+            try {
+                stopService(value);
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Caught exception stopping service: " + value, 
e);
+                }
+                if (firstException == null) {
+                    firstException = e;
                 }
             }
         }
@@ -221,10 +224,6 @@ public final class ServiceHelper {
                 Service service = (Service)value;
                 try {
                     resumeService(service);
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Resumed service: " + service);
-                    }
-                    service.stop();
                 } catch (Exception e) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Caught exception resuming service: " + 
service, e);
@@ -260,8 +259,8 @@ public final class ServiceHelper {
         if (service instanceof SuspendableService) {
             SuspendableService ss = (SuspendableService) service;
             if (ss.isSuspended()) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Resuming service " + service);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Resuming service " + service);
                 }
                 ss.resume();
                 return true;
@@ -284,10 +283,6 @@ public final class ServiceHelper {
                 Service service = (Service)value;
                 try {
                     suspendService(service);
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Suspending service: " + service);
-                    }
-                    service.stop();
                 } catch (Exception e) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Caught exception suspending service: " + 
service, e);
@@ -337,4 +332,34 @@ public final class ServiceHelper {
         }
     }
 
+    /**
+     * Is the given service stopping or stopped?
+     *
+     * @return <tt>true</tt> if already stopped, otherwise <tt>false</tt>
+     */
+    public static boolean isStopped(Object value) {
+        if (value instanceof ServiceSupport) {
+            ServiceSupport service = (ServiceSupport) value;
+            if (service.isStopping() || service.isStopped()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Is the given service starting or started?
+     *
+     * @return <tt>true</tt> if already started, otherwise <tt>false</tt>
+     */
+    public static boolean isStarted(Object value) {
+        if (value instanceof ServiceSupport) {
+            ServiceSupport service = (ServiceSupport) value;
+            if (service.isStarting() || service.isStarted()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java
 Fri Jan 28 17:06:59 2011
@@ -44,6 +44,9 @@ public class ManagedRegisterEndpointTest
         String id = (String) mbeanServer.getAttribute(name, "CamelId");
         assertEquals("camel-1", id);
 
+        String state = (String) mbeanServer.getAttribute(name, "State");
+        assertEquals("Started", state);
+
         Boolean singleton = (Boolean) mbeanServer.getAttribute(name, 
"Singleton");
         assertEquals(Boolean.TRUE, singleton);
     }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java?rev=1064780&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java
 Fri Jan 28 17:06:59 2011
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+
+/**
+ * @version $Revision$
+ */
+public class Endpoint2MustBeStartedBeforeSendProcessorTest extends 
ContextTestSupport {
+
+    private MyEndpoint myendpoint;
+    private volatile String order = "";
+
+    public void testEndpointMustBeStartedBeforeProducer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myendpoint = new MyEndpoint("myendpoint", context);
+
+                from("direct:start")
+                    .to(myendpoint);
+            }
+        });
+        context.start();
+
+        assertEquals("EndpointProducer", order);
+    }
+
+    public void testEndpointMustBeStartedBeforeConsumer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myendpoint = new MyEndpoint("myendpoint", context);
+
+                from(myendpoint)
+                    .to("mock:result");
+            }
+        });
+        context.start();
+
+        assertEquals("EndpointConsumer", order);
+    }
+
+    public void testEndpointMustBeStartedBeforeConsumerAndProducer() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myendpoint = new MyEndpoint("myendpoint", context);
+
+                from(myendpoint)
+                    .to("mock:result")
+                    .to(myendpoint);
+            }
+        });
+        context.start();
+
+        assertEquals("EndpointProducerConsumer", order);
+    }
+
+    public void testEndpointStartedOnceAndOnlyStoppedOnShutdown() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myendpoint = new MyEndpoint("myendpoint", context);
+
+                from(myendpoint).routeId("foo")
+                    .to("mock:result")
+                    .to(myendpoint);
+            }
+        });
+        context.start();
+
+        assertEquals("EndpointProducerConsumer", order);
+        order = "";
+
+        context.stopRoute("foo");
+        assertEquals("StopConsumerStopProducer", order);
+
+        order = "";
+        context.startRoute("foo");
+        assertEquals("ProducerConsumer", order);
+
+        order = "";
+        context.stop();
+        // will invoke StopEndpoint twice as shutdown will ensure we are 
stopped first
+        assertEquals("StopConsumerStopProducerStopEndpointStopEndpoint", 
order);
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    private final class MyEndpoint extends DefaultEndpoint {
+
+        private MyEndpoint(String endpointUri, CamelContext camelContext) {
+            super(endpointUri, camelContext);
+        }
+
+        public Producer createProducer() throws Exception {
+            return new MyProducer(this);
+        }
+
+        public Consumer createConsumer(Processor processor) throws Exception {
+            return new MyConsumer(this, null);
+        }
+
+        public boolean isSingleton() {
+            return true;
+        }
+
+        // in this test we use start/stop to implement logic
+        // this is however discouraged, as you should prefer to use 
doStart/doStop
+
+        @Override
+        public void start() throws Exception {
+            super.start();
+            order += "Endpoint";
+        }
+
+        @Override
+        public void stop() throws Exception {
+            super.stop();
+            order += "StopEndpoint";
+        }
+    }
+
+    private class MyProducer extends DefaultProducer {
+
+        public MyProducer(Endpoint endpoint) {
+            super(endpoint);
+        }
+
+        public void process(Exchange exchange) throws Exception {
+            // noop
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            order += "Producer";
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            order += "StopProducer";
+        }
+    }
+
+    private class MyConsumer extends DefaultConsumer {
+
+        public MyConsumer(Endpoint endpoint, Processor processor) {
+            super(endpoint, processor);
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            order += "Consumer";
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            order += "StopConsumer";
+        }
+    }
+}

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java?rev=1064780&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java
 Fri Jan 28 17:06:59 2011
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+
+/**
+ * @version $Revision$
+ */
+public class EndpointMustBeStartedBeforeSendProcessorTest extends 
ContextTestSupport {
+
+    private MyEndpoint myendpoint;
+    private volatile String order = "";
+
+    public void testEndpointMustBeStartedBeforeProducer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myendpoint = new MyEndpoint("myendpoint", context);
+
+                from("direct:start")
+                    .to(myendpoint);
+            }
+        });
+        context.start();
+
+        assertEquals("EndpointProducer", order);
+    }
+
+    public void testEndpointMustBeStartedBeforeConsumer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myendpoint = new MyEndpoint("myendpoint", context);
+
+                from(myendpoint)
+                    .to("mock:result");
+            }
+        });
+        context.start();
+
+        assertEquals("EndpointConsumer", order);
+    }
+
+    public void testEndpointMustBeStartedBeforeConsumerAndProducer() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myendpoint = new MyEndpoint("myendpoint", context);
+
+                from(myendpoint)
+                    .to("mock:result")
+                    .to(myendpoint);
+            }
+        });
+        context.start();
+
+        assertEquals("EndpointProducerConsumer", order);
+    }
+
+    public void testEndpointStartedOnceAndOnlyStoppedOnShutdown() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myendpoint = new MyEndpoint("myendpoint", context);
+
+                from(myendpoint).routeId("foo")
+                    .to("mock:result")
+                    .to(myendpoint);
+            }
+        });
+        context.start();
+
+        assertEquals("EndpointProducerConsumer", order);
+        order = "";
+
+        context.stopRoute("foo");
+        assertEquals("StopConsumerStopProducer", order);
+
+        order = "";
+        context.startRoute("foo");
+        assertEquals("ProducerConsumer", order);
+
+        order = "";
+        context.stop();
+        assertEquals("StopConsumerStopProducerStopEndpoint", order);
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    private final class MyEndpoint extends DefaultEndpoint {
+
+        private MyEndpoint(String endpointUri, CamelContext camelContext) {
+            super(endpointUri, camelContext);
+        }
+
+        public Producer createProducer() throws Exception {
+            return new MyProducer(this);
+        }
+
+        public Consumer createConsumer(Processor processor) throws Exception {
+            return new MyConsumer(this, null);
+        }
+
+        public boolean isSingleton() {
+            return true;
+        }
+
+        @Override
+        public void doStart() throws Exception {
+            order += "Endpoint";
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            order += "StopEndpoint";
+        }
+    }
+
+    private class MyProducer extends DefaultProducer {
+
+        public MyProducer(Endpoint endpoint) {
+            super(endpoint);
+        }
+
+        public void process(Exchange exchange) throws Exception {
+            // noop
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            order += "Producer";
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            order += "StopProducer";
+        }
+    }
+
+    private class MyConsumer extends DefaultConsumer {
+
+        public MyConsumer(Endpoint endpoint, Processor processor) {
+            super(endpoint, processor);
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            order += "Consumer";
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            order += "StopConsumer";
+        }
+    }
+}

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Fri Jan 28 
17:06:59 2011
@@ -47,6 +47,7 @@ log4j.logger.org.apache.camel.impl.Defau
 #log4j.logger.org.apache.camel.impl=TRACE
 #log4j.logger.org.apache.camel.util.FileUtil=TRACE
 #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE
+#log4j.logger.org.apache.camel.util.ServiceHelper=TRACE
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -67,11 +67,9 @@ public class SqsEndpoint extends Schedul
     public boolean isSingleton() {
         return true;
     }
-    
+
     @Override
-    public void start() throws Exception {
-        super.start();
-        
+    protected void doStart() throws Exception {
         client = getConfiguration().getAmazonSQSClient() != null
                 ? getConfiguration().getAmazonSQSClient() : getClient();
         
@@ -92,12 +90,10 @@ public class SqsEndpoint extends Schedul
     }
 
     @Override
-    public void stop() throws Exception {
+    protected void doStop() throws Exception {
         client = null;
-        
-        super.stop();
     }
-    
+
     public Exchange createExchange(com.amazonaws.services.sqs.model.Message 
msg) {
         return createExchange(getExchangePattern(), msg);
     }

Modified: 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -548,9 +548,10 @@ public class CxfEndpoint extends Default
 
     public boolean isLoggingFeatureEnabled() {
         return loggingFeatureEnabled;
-    }    
+    }
 
-    public void start() throws Exception {
+    @Override
+    protected void doStart() throws Exception {
         if (headerFilterStrategy == null) {
             headerFilterStrategy = new CxfHeaderFilterStrategy();
         }
@@ -562,7 +563,8 @@ public class CxfEndpoint extends Default
         }
     }
 
-    public void stop() throws Exception {
+    @Override
+    protected void doStop() throws Exception {
         // noop
     }
 

Modified: 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -56,15 +56,17 @@ public class CxfBeanEndpoint extends Pro
     public CxfBeanEndpoint(String remaining, CxfBeanComponent component) {
         super(remaining, component);
     }
-    
-    public void stop() {
-        server.stop();
-    }
-    
-    public void start() {
+
+    @Override
+    protected void doStart() throws Exception {
         server.start();
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        server.stop();
+    }
+
     @SuppressWarnings("unchecked")
     public void init() {
         Object obj = CamelContextHelper.mandatoryLookup(getCamelContext(), 
getEndpointUri());

Modified: 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java
 (original)
+++ 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java
 Fri Jan 28 17:06:59 2011
@@ -17,15 +17,11 @@
 package org.apache.camel.component.cxf.spring;
 
 import java.util.List;
-
 import javax.xml.ws.handler.Handler;
 
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.frontend.AbstractServiceFactory;
-import org.apache.cxf.frontend.AbstractWSDLBasedEndpointFactory;
 import org.apache.cxf.service.factory.ReflectionServiceFactoryBean;
-import org.apache.cxf.wsdl11.WSDLEndpointFactory;
-
 import org.springframework.beans.factory.BeanNameAware;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.NamedBean;
@@ -57,7 +53,6 @@ public class CxfEndpointBean extends Abs
         // Clean up the BusFactory's defaultBus
         BusFactory.setDefaultBus(null);
         BusFactory.setThreadDefaultBus(null);
-        
     }
 
     public void setBeanName(String name) {

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 Fri Jan 28 17:06:59 2011
@@ -47,7 +47,6 @@ import org.springframework.transaction.P
 import org.springframework.util.Assert;
 
 import static 
org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
-import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
 
 /**
  * @version $Revision$
@@ -356,7 +355,7 @@ public class JmsConfiguration implements
     }
 
     public AbstractMessageListenerContainer 
createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
-        AbstractMessageListenerContainer container = 
chooseMessageListenerContainerImplementation();
+        AbstractMessageListenerContainer container = 
chooseMessageListenerContainerImplementation(endpoint);
         configureMessageListenerContainer(container, endpoint);
         return container;
     }
@@ -960,12 +959,13 @@ public class JmsConfiguration implements
         }
     }
 
-    public AbstractMessageListenerContainer 
chooseMessageListenerContainerImplementation() {
+    public AbstractMessageListenerContainer 
chooseMessageListenerContainerImplementation(JmsEndpoint endpoint) {
         switch (consumerType) {
         case Simple:
+            // TODO: simple is @deprecated and should be removed in Camel 2.7 
when we upgrade to Spring 3
             return new SimpleMessageListenerContainer();
         case Default:
-            return new DefaultMessageListenerContainer();
+            return new JmsMessageListenerContainer(endpoint);
         default:
             throw new IllegalArgumentException("Unknown consumer type: " + 
consumerType);
         }
@@ -985,6 +985,7 @@ public class JmsConfiguration implements
      * @return the cache level
      */
     protected int defaultCacheLevel(JmsEndpoint endpoint) {
+        // TODO: upgrade to Spring 3
         // if we are on a new enough spring version we can assume 
CACHE_CONSUMER
         if (PackageHelper.isValidVersion("org.springframework.jms", 2.51D)) {
             return DefaultMessageListenerContainer.CACHE_CONSUMER;

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -19,6 +19,7 @@ package org.apache.camel.component.jms;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -72,6 +73,7 @@ public class JmsEndpoint extends Default
     private ReplyManager replyManager;
     // scheduled executor to check for timeout (reply not received)
     private ScheduledExecutorService replyManagerExecutorService;
+    private final AtomicBoolean running = new AtomicBoolean();
 
     public JmsEndpoint() {
         this(null, null);
@@ -373,10 +375,22 @@ public class JmsEndpoint extends Default
         return replyManagerExecutorService;
     }
 
-    public void start() throws Exception {
+    /**
+     * State whether this endpoint is running (eg started)
+     */
+    protected boolean isRunning() {
+        return running.get();
     }
 
-    public void stop() throws Exception {
+    @Override
+    protected void doStart() throws Exception {
+        running.set(true);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        running.set(false);
+
         if (replyManager != null) {
             ServiceHelper.stopService(replyManager);
             replyManager = null;

Added: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java?rev=1064780&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java
 (added)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java
 Fri Jan 28 17:06:59 2011
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+/**
+ * The {@link DefaultMessageListenerContainer container} which listen for 
messages
+ * on the JMS destination.
+ * <p/>
+ * This implementation extends Springs {@link DefaultMessageListenerContainer} 
supporting
+ * automatic recovery and throttling.
+ *
+ * @version $Revision$
+ */
+public class JmsMessageListenerContainer extends 
DefaultMessageListenerContainer {
+
+    private final JmsEndpoint endpoint;
+
+    public JmsMessageListenerContainer(JmsEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected boolean runningAllowed() {
+        // do not run if we have been stopped
+        return endpoint.isRunning();
+    }
+}

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -79,8 +79,7 @@ public class NettyEndpoint extends Defau
     }
 
     @Override
-    public void start() throws Exception {
-        super.start();
+    protected void doStart() throws Exception {
         ObjectHelper.notNull(timer, "timer");
     }
 

Modified: 
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -240,17 +240,21 @@ public class QuartzEndpoint extends Defa
         return new JobDetail();
     }
 
-    public void start() throws Exception {
+    @Override
+    protected void doStart() throws Exception {
         ObjectHelper.notNull(getComponent(), "QuartzComponent", this);
         ServiceHelper.startService(loadBalancer);
     }
 
-    public void stop() throws Exception {
+    @Override
+    protected void doStop() throws Exception {
         ServiceHelper.stopService(loadBalancer);
     }
 
-    public void shutdown() throws Exception {
+    @Override
+    protected void doShutdown() throws Exception {
         ObjectHelper.notNull(trigger, "trigger");
         deleteTrigger(getTrigger());
     }
+
 }

Modified: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -27,6 +27,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +43,7 @@ public class QuickfixjEndpoint extends D
     private static final Logger LOG = 
LoggerFactory.getLogger(QuickfixjEndpoint.class);
     
     private SessionID sessionID;
-    private List<QuickfixjConsumer> consumers = new 
CopyOnWriteArrayList<QuickfixjConsumer>();
+    private final List<QuickfixjConsumer> consumers = new 
CopyOnWriteArrayList<QuickfixjConsumer>();
     
     public QuickfixjEndpoint(String uri, CamelContext context) {
         super(uri, context);
@@ -59,8 +60,6 @@ public class QuickfixjEndpoint extends D
     public Consumer createConsumer(Processor processor) throws Exception {
         LOG.info("Creating QuickFIX/J consumer: " + (sessionID != null ? 
sessionID : "No Session"));
         QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor);
-        // TODO The lifecycle mgmt requirements aren't clear to me
-        consumer.start();
         consumers.add(consumer);
         return consumer;
     }
@@ -71,7 +70,6 @@ public class QuickfixjEndpoint extends D
     }
 
     public boolean isSingleton() {
-        // TODO This seems to be incorrect. There can be multiple consumers 
for a session endpoint.
         return true;
     }
 
@@ -90,4 +88,10 @@ public class QuickfixjEndpoint extends D
     public boolean isMultipleConsumersSupported() {
         return true;
     }
+
+    @Override
+    protected void doStop() throws Exception {
+        // clear list of consumers
+        consumers.clear();
+    }
 }

Modified: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
 Fri Jan 28 17:06:59 2011
@@ -37,6 +37,7 @@ import org.apache.camel.component.quickf
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.impl.converter.StaticMethodTypeConverter;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.mina.common.TransportType;
 import org.junit.After;
 import org.junit.Before;
@@ -224,7 +225,8 @@ public class QuickfixjComponentTest {
                 }
             }
         });
-        
+        ServiceHelper.startService(consumer);
+
         // Endpoint automatically starts the consumer
         assertThat(((ServiceSupport)consumer).isStarted(), is(true));
         
@@ -268,7 +270,7 @@ public class QuickfixjComponentTest {
         final CountDownLatch logonLatch = new CountDownLatch(2);
         final CountDownLatch messageLatch = new CountDownLatch(2);
                 
-        endpoint.createConsumer(new Processor() {
+        Consumer consumer = endpoint.createConsumer(new Processor() {
             public void process(Exchange exchange) throws Exception {
                 QuickfixjEventCategory eventCategory = 
                     (QuickfixjEventCategory) 
exchange.getIn().getHeader(QuickfixjEndpoint.EVENT_CATEGORY_KEY);
@@ -279,7 +281,8 @@ public class QuickfixjComponentTest {
                 }
             }
         });
-        
+        ServiceHelper.startService(consumer);
+
         component.start();
         
         assertTrue("Session not created", logonLatch.await(5000, 
TimeUnit.MILLISECONDS));

Modified: 
camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -23,7 +23,6 @@ import org.apache.camel.Consumer;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.Service;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -34,7 +33,7 @@ import org.restlet.data.Method;
  *
  * @version $Revision$
  */
-public class RestletEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware, Service {
+public class RestletEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware {
 
     private static final int DEFAULT_PORT = 80;
     private static final String DEFAULT_PROTOCOL = "http";
@@ -179,7 +178,16 @@ public class RestletEndpoint extends Def
         return restletUriPatterns;
     }
 
-    public void start() throws Exception {
+    public boolean isThrowExceptionOnFailure() {
+        return throwExceptionOnFailure;
+    }
+
+    public void setThrowExceptionOnFailure(boolean throwExceptionOnFailure) {
+        this.throwExceptionOnFailure = throwExceptionOnFailure;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
         if (headerFilterStrategy == null) {
             headerFilterStrategy = new RestletHeaderFilterStrategy();
         }
@@ -191,15 +199,9 @@ public class RestletEndpoint extends Def
         }
     }
 
-    public void stop() throws Exception {
+    @Override
+    protected void doStop() throws Exception {
         // noop
     }
 
-    public boolean isThrowExceptionOnFailure() {
-        return throwExceptionOnFailure;
-    }
-
-    public void setThrowExceptionOnFailure(boolean throwExceptionOnFailure) {
-        this.throwExceptionOnFailure = throwExceptionOnFailure;
-    }
 }

Modified: 
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java
 Fri Jan 28 17:06:59 2011
@@ -45,7 +45,8 @@ public class TestEndpoint extends MockEn
         this.expectedMessageEndpoint = expectedMessageEndpoint;
     }
 
-    public void start() throws Exception {
+    @Override
+    protected void doStart() throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Consuming expected messages from: " + 
expectedMessageEndpoint);
         }
@@ -62,9 +63,6 @@ public class TestEndpoint extends MockEn
         }
         expectedBodiesReceived(expectedBodies);
     }
-    
-    public void stop() throws Exception {
-    }
 
     /**
      * This method allows us to convert or coerce the expected message body 
into some other type


Reply via email to