Author: davsclaus Date: Fri Jan 28 17:06:59 2011 New Revision: 1064780 URL: http://svn.apache.org/viewvc?rev=1064780&view=rev Log: CAMEL-3600: Avoid startings/stopping services multiple times. Now Camel checks state and only executes if needed.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java camel/trunk/camel-core/src/test/resources/log4j.properties camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Fri Jan 28 17:06:59 2011 @@ -42,7 +42,7 @@ import org.apache.camel.util.ObjectHelpe * * @version $Revision$ */ -public abstract class DefaultEndpoint implements Endpoint, HasId, CamelContextAware { +public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware { //Match any key-value pair in the URI query string whose key contains "passphrase" or "password" (case-insensitive). //First capture group is the key, second is the value. @@ -232,11 +232,13 @@ public abstract class DefaultEndpoint im return false; } - public void start() throws Exception { + @Override + protected void doStart() throws Exception { // noop } - public void stop() throws Exception { + @Override + protected void doStop() throws Exception { // noop } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRoute.java Fri Jan 28 17:06:59 2011 @@ -90,6 +90,7 @@ public abstract class DefaultRoute exten } protected void doStart() throws Exception { + // noop } protected void doStop() throws Exception { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Fri Jan 28 17:06:59 2011 @@ -56,6 +56,7 @@ public class RouteService extends Servic private boolean removingRoutes; private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>(); private final AtomicBoolean warmUpDone = new AtomicBoolean(false); + private final AtomicBoolean endpointpDone = new AtomicBoolean(false); public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) { this.camelContext = camelContext; @@ -105,11 +106,20 @@ public class RouteService extends Servic } public synchronized void warmUp() throws Exception { + if (endpointpDone.compareAndSet(false, true)) { + // endpoints should only be started once as they can be reused on other routes + // and whatnot, thus their lifecycle is to start once, and only to stop when Camel shutdown + for (Route route : routes) { + // ensure endpoint is started first (before the route services, such as the consumer) + ServiceHelper.startService(route.getEndpoint()); + } + } + if (warmUpDone.compareAndSet(false, true)) { for (Route route : routes) { - if (LOG.isTraceEnabled()) { - LOG.trace("Starting route services: " + route); + if (LOG.isDebugEnabled()) { + LOG.debug("Starting services on route: " + route.getId()); } List<Service> services = route.getServices(); @@ -172,8 +182,8 @@ public class RouteService extends Servic } for (Route route : routes) { - if (LOG.isTraceEnabled()) { - LOG.trace("Stopping route: " + route); + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping services on route: " + route.getId()); } // getServices will not add services again List<Service> services = route.getServices(); @@ -187,9 +197,9 @@ public class RouteService extends Servic // stop the route itself if (isShutdownCamelContext) { - ServiceHelper.stopAndShutdownService(route); + ServiceHelper.stopAndShutdownServices(route); } else { - ServiceHelper.stopService(route); + ServiceHelper.stopServices(route); } // fire event @@ -203,6 +213,12 @@ public class RouteService extends Servic @Override protected void doShutdown() throws Exception { + for (Route route : routes) { + // endpoints should only be stopped when Camel is shutting down + // so comments in warmUp method + ServiceHelper.stopAndShutdownServices(route.getEndpoint()); + } + // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { strategy.onRoutesRemove(routes); @@ -211,6 +227,7 @@ public class RouteService extends Servic // clear inputs on shutdown inputs.clear(); warmUpDone.set(false); + endpointpDone.set(false); } @Override Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AnnotationTypeConverterLoader.java Fri Jan 28 17:06:59 2011 @@ -80,8 +80,8 @@ public class AnnotationTypeConverterLoad LOG.info("Found " + packageNames.length + " packages with " + classes.size() + " @Converter classes to load"); for (Class type : classes) { - if (LOG.isDebugEnabled()) { - LOG.debug("Loading converter class: " + ObjectHelper.name(type)); + if (LOG.isTraceEnabled()) { + LOG.trace("Loading converter class: " + ObjectHelper.name(type)); } loadConverterMethods(registry, type); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java Fri Jan 28 17:06:59 2011 @@ -121,6 +121,10 @@ public class ManagedManagementStrategy e } else if (managedObject instanceof ManagedService) { // check for managed service should be last ManagedService ms = (ManagedService) managedObject; + // skip endpoints as they are already managed + if (ms.getService() instanceof Endpoint) { + return null; + } objectName = getManagementNamingStrategy().getObjectNameForService(ms.getContext(), ms.getService()); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEndpoint.java Fri Jan 28 17:06:59 2011 @@ -17,6 +17,8 @@ package org.apache.camel.management.mbean; import org.apache.camel.Endpoint; +import org.apache.camel.ServiceStatus; +import org.apache.camel.impl.ServiceSupport; import org.apache.camel.spi.ManagementStrategy; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedResource; @@ -52,6 +54,22 @@ public class ManagedEndpoint implements return endpoint.isSingleton(); } + @ManagedAttribute(description = "Service State") + public String getState() { + // must use String type to be sure remote JMX can read the attribute without requiring Camel classes. + if (endpoint instanceof ServiceSupport) { + ServiceStatus status = ((ServiceSupport) endpoint).getStatus(); + // if no status exists then its stopped + if (status == null) { + status = ServiceStatus.Stopped; + } + return status.name(); + } + + // assume started if not a ServiceSupport instance + return ServiceStatus.Started.name(); + } + public Object getInstance() { return endpoint; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Fri Jan 28 17:06:59 2011 @@ -141,6 +141,8 @@ public class SendProcessor extends Servi destination = lookup; } // warm up the producer by starting it so we can fail fast if there was a problem + // however must start endpoint first + ServiceHelper.startService(destination); producerCache.startProducer(destination); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java Fri Jan 28 17:06:59 2011 @@ -73,4 +73,9 @@ public class LRUCache<K, V> extends Link clear(); } } + + @Override + public String toString() { + return "LRUCache@" + ObjectHelper.getIdentityHashCode(this); + } } \ No newline at end of file Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Fri Jan 28 17:06:59 2011 @@ -23,6 +23,7 @@ import java.util.List; import org.apache.camel.Service; import org.apache.camel.ShutdownableService; import org.apache.camel.SuspendableService; +import org.apache.camel.impl.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +45,13 @@ public final class ServiceHelper { * Starts all of the given services */ public static void startService(Object value) throws Exception { + if (isStarted(value)) { + // only start service if not already started + if (LOG.isTraceEnabled()) { + LOG.trace("Service already started: " + value); + } + return; + } if (value instanceof Service) { Service service = (Service)value; if (LOG.isTraceEnabled()) { @@ -75,13 +83,7 @@ public final class ServiceHelper { return; } for (Object value : services) { - if (value instanceof Service) { - Service service = (Service)value; - if (LOG.isTraceEnabled()) { - LOG.trace("Starting service: " + service); - } - service.start(); - } + startService(value); } } @@ -100,6 +102,13 @@ public final class ServiceHelper { * Stops all of the given services, throwing the first exception caught */ public static void stopService(Object value) throws Exception { + if (isStopped(value)) { + // only stop service if not already stopped + if (LOG.isTraceEnabled()) { + LOG.trace("Service already stopped: " + value); + } + return; + } if (value instanceof Service) { Service service = (Service)value; if (LOG.isTraceEnabled()) { @@ -120,20 +129,14 @@ public final class ServiceHelper { } Exception firstException = null; for (Object value : services) { - if (value instanceof Service) { - Service service = (Service)value; - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Stopping service: " + service); - } - service.stop(); - } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception stopping service: " + service, e); - } - if (firstException == null) { - firstException = e; - } + try { + stopService(value); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Caught exception stopping service: " + value, e); + } + if (firstException == null) { + firstException = e; } } } @@ -221,10 +224,6 @@ public final class ServiceHelper { Service service = (Service)value; try { resumeService(service); - if (LOG.isTraceEnabled()) { - LOG.trace("Resumed service: " + service); - } - service.stop(); } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Caught exception resuming service: " + service, e); @@ -260,8 +259,8 @@ public final class ServiceHelper { if (service instanceof SuspendableService) { SuspendableService ss = (SuspendableService) service; if (ss.isSuspended()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Resuming service " + service); + if (LOG.isDebugEnabled()) { + LOG.debug("Resuming service " + service); } ss.resume(); return true; @@ -284,10 +283,6 @@ public final class ServiceHelper { Service service = (Service)value; try { suspendService(service); - if (LOG.isTraceEnabled()) { - LOG.trace("Suspending service: " + service); - } - service.stop(); } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Caught exception suspending service: " + service, e); @@ -337,4 +332,34 @@ public final class ServiceHelper { } } + /** + * Is the given service stopping or stopped? + * + * @return <tt>true</tt> if already stopped, otherwise <tt>false</tt> + */ + public static boolean isStopped(Object value) { + if (value instanceof ServiceSupport) { + ServiceSupport service = (ServiceSupport) value; + if (service.isStopping() || service.isStopped()) { + return true; + } + } + return false; + } + + /** + * Is the given service starting or started? + * + * @return <tt>true</tt> if already started, otherwise <tt>false</tt> + */ + public static boolean isStarted(Object value) { + if (value instanceof ServiceSupport) { + ServiceSupport service = (ServiceSupport) value; + if (service.isStarting() || service.isStarted()) { + return true; + } + } + return false; + } + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterEndpointTest.java Fri Jan 28 17:06:59 2011 @@ -44,6 +44,9 @@ public class ManagedRegisterEndpointTest String id = (String) mbeanServer.getAttribute(name, "CamelId"); assertEquals("camel-1", id); + String state = (String) mbeanServer.getAttribute(name, "State"); + assertEquals("Started", state); + Boolean singleton = (Boolean) mbeanServer.getAttribute(name, "Singleton"); assertEquals(Boolean.TRUE, singleton); } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java?rev=1064780&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Endpoint2MustBeStartedBeforeSendProcessorTest.java Fri Jan 28 17:06:59 2011 @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.DefaultProducer; + +/** + * @version $Revision$ + */ +public class Endpoint2MustBeStartedBeforeSendProcessorTest extends ContextTestSupport { + + private MyEndpoint myendpoint; + private volatile String order = ""; + + public void testEndpointMustBeStartedBeforeProducer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + myendpoint = new MyEndpoint("myendpoint", context); + + from("direct:start") + .to(myendpoint); + } + }); + context.start(); + + assertEquals("EndpointProducer", order); + } + + public void testEndpointMustBeStartedBeforeConsumer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + myendpoint = new MyEndpoint("myendpoint", context); + + from(myendpoint) + .to("mock:result"); + } + }); + context.start(); + + assertEquals("EndpointConsumer", order); + } + + public void testEndpointMustBeStartedBeforeConsumerAndProducer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + myendpoint = new MyEndpoint("myendpoint", context); + + from(myendpoint) + .to("mock:result") + .to(myendpoint); + } + }); + context.start(); + + assertEquals("EndpointProducerConsumer", order); + } + + public void testEndpointStartedOnceAndOnlyStoppedOnShutdown() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + myendpoint = new MyEndpoint("myendpoint", context); + + from(myendpoint).routeId("foo") + .to("mock:result") + .to(myendpoint); + } + }); + context.start(); + + assertEquals("EndpointProducerConsumer", order); + order = ""; + + context.stopRoute("foo"); + assertEquals("StopConsumerStopProducer", order); + + order = ""; + context.startRoute("foo"); + assertEquals("ProducerConsumer", order); + + order = ""; + context.stop(); + // will invoke StopEndpoint twice as shutdown will ensure we are stopped first + assertEquals("StopConsumerStopProducerStopEndpointStopEndpoint", order); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + private final class MyEndpoint extends DefaultEndpoint { + + private MyEndpoint(String endpointUri, CamelContext camelContext) { + super(endpointUri, camelContext); + } + + public Producer createProducer() throws Exception { + return new MyProducer(this); + } + + public Consumer createConsumer(Processor processor) throws Exception { + return new MyConsumer(this, null); + } + + public boolean isSingleton() { + return true; + } + + // in this test we use start/stop to implement logic + // this is however discouraged, as you should prefer to use doStart/doStop + + @Override + public void start() throws Exception { + super.start(); + order += "Endpoint"; + } + + @Override + public void stop() throws Exception { + super.stop(); + order += "StopEndpoint"; + } + } + + private class MyProducer extends DefaultProducer { + + public MyProducer(Endpoint endpoint) { + super(endpoint); + } + + public void process(Exchange exchange) throws Exception { + // noop + } + + @Override + protected void doStart() throws Exception { + order += "Producer"; + } + + @Override + protected void doStop() throws Exception { + order += "StopProducer"; + } + } + + private class MyConsumer extends DefaultConsumer { + + public MyConsumer(Endpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected void doStart() throws Exception { + order += "Consumer"; + } + + @Override + protected void doStop() throws Exception { + order += "StopConsumer"; + } + } +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java?rev=1064780&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EndpointMustBeStartedBeforeSendProcessorTest.java Fri Jan 28 17:06:59 2011 @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.DefaultProducer; + +/** + * @version $Revision$ + */ +public class EndpointMustBeStartedBeforeSendProcessorTest extends ContextTestSupport { + + private MyEndpoint myendpoint; + private volatile String order = ""; + + public void testEndpointMustBeStartedBeforeProducer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + myendpoint = new MyEndpoint("myendpoint", context); + + from("direct:start") + .to(myendpoint); + } + }); + context.start(); + + assertEquals("EndpointProducer", order); + } + + public void testEndpointMustBeStartedBeforeConsumer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + myendpoint = new MyEndpoint("myendpoint", context); + + from(myendpoint) + .to("mock:result"); + } + }); + context.start(); + + assertEquals("EndpointConsumer", order); + } + + public void testEndpointMustBeStartedBeforeConsumerAndProducer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + myendpoint = new MyEndpoint("myendpoint", context); + + from(myendpoint) + .to("mock:result") + .to(myendpoint); + } + }); + context.start(); + + assertEquals("EndpointProducerConsumer", order); + } + + public void testEndpointStartedOnceAndOnlyStoppedOnShutdown() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + myendpoint = new MyEndpoint("myendpoint", context); + + from(myendpoint).routeId("foo") + .to("mock:result") + .to(myendpoint); + } + }); + context.start(); + + assertEquals("EndpointProducerConsumer", order); + order = ""; + + context.stopRoute("foo"); + assertEquals("StopConsumerStopProducer", order); + + order = ""; + context.startRoute("foo"); + assertEquals("ProducerConsumer", order); + + order = ""; + context.stop(); + assertEquals("StopConsumerStopProducerStopEndpoint", order); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + private final class MyEndpoint extends DefaultEndpoint { + + private MyEndpoint(String endpointUri, CamelContext camelContext) { + super(endpointUri, camelContext); + } + + public Producer createProducer() throws Exception { + return new MyProducer(this); + } + + public Consumer createConsumer(Processor processor) throws Exception { + return new MyConsumer(this, null); + } + + public boolean isSingleton() { + return true; + } + + @Override + public void doStart() throws Exception { + order += "Endpoint"; + } + + @Override + protected void doStop() throws Exception { + order += "StopEndpoint"; + } + } + + private class MyProducer extends DefaultProducer { + + public MyProducer(Endpoint endpoint) { + super(endpoint); + } + + public void process(Exchange exchange) throws Exception { + // noop + } + + @Override + protected void doStart() throws Exception { + order += "Producer"; + } + + @Override + protected void doStop() throws Exception { + order += "StopProducer"; + } + } + + private class MyConsumer extends DefaultConsumer { + + public MyConsumer(Endpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected void doStart() throws Exception { + order += "Consumer"; + } + + @Override + protected void doStop() throws Exception { + order += "StopConsumer"; + } + } +} Modified: camel/trunk/camel-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/resources/log4j.properties (original) +++ camel/trunk/camel-core/src/test/resources/log4j.properties Fri Jan 28 17:06:59 2011 @@ -47,6 +47,7 @@ log4j.logger.org.apache.camel.impl.Defau #log4j.logger.org.apache.camel.impl=TRACE #log4j.logger.org.apache.camel.util.FileUtil=TRACE #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE +#log4j.logger.org.apache.camel.util.ServiceHelper=TRACE # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Fri Jan 28 17:06:59 2011 @@ -67,11 +67,9 @@ public class SqsEndpoint extends Schedul public boolean isSingleton() { return true; } - + @Override - public void start() throws Exception { - super.start(); - + protected void doStart() throws Exception { client = getConfiguration().getAmazonSQSClient() != null ? getConfiguration().getAmazonSQSClient() : getClient(); @@ -92,12 +90,10 @@ public class SqsEndpoint extends Schedul } @Override - public void stop() throws Exception { + protected void doStop() throws Exception { client = null; - - super.stop(); } - + public Exchange createExchange(com.amazonaws.services.sqs.model.Message msg) { return createExchange(getExchangePattern(), msg); } Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original) +++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Fri Jan 28 17:06:59 2011 @@ -548,9 +548,10 @@ public class CxfEndpoint extends Default public boolean isLoggingFeatureEnabled() { return loggingFeatureEnabled; - } + } - public void start() throws Exception { + @Override + protected void doStart() throws Exception { if (headerFilterStrategy == null) { headerFilterStrategy = new CxfHeaderFilterStrategy(); } @@ -562,7 +563,8 @@ public class CxfEndpoint extends Default } } - public void stop() throws Exception { + @Override + protected void doStop() throws Exception { // noop } Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java (original) +++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/cxfbean/CxfBeanEndpoint.java Fri Jan 28 17:06:59 2011 @@ -56,15 +56,17 @@ public class CxfBeanEndpoint extends Pro public CxfBeanEndpoint(String remaining, CxfBeanComponent component) { super(remaining, component); } - - public void stop() { - server.stop(); - } - - public void start() { + + @Override + protected void doStart() throws Exception { server.start(); } + @Override + protected void doStop() throws Exception { + server.stop(); + } + @SuppressWarnings("unchecked") public void init() { Object obj = CamelContextHelper.mandatoryLookup(getCamelContext(), getEndpointUri()); Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java (original) +++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java Fri Jan 28 17:06:59 2011 @@ -17,15 +17,11 @@ package org.apache.camel.component.cxf.spring; import java.util.List; - import javax.xml.ws.handler.Handler; import org.apache.cxf.BusFactory; import org.apache.cxf.frontend.AbstractServiceFactory; -import org.apache.cxf.frontend.AbstractWSDLBasedEndpointFactory; import org.apache.cxf.service.factory.ReflectionServiceFactoryBean; -import org.apache.cxf.wsdl11.WSDLEndpointFactory; - import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.NamedBean; @@ -57,7 +53,6 @@ public class CxfEndpointBean extends Abs // Clean up the BusFactory's defaultBus BusFactory.setDefaultBus(null); BusFactory.setThreadDefaultBus(null); - } public void setBeanName(String name) { Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Fri Jan 28 17:06:59 2011 @@ -47,7 +47,6 @@ import org.springframework.transaction.P import org.springframework.util.Assert; import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName; -import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; /** * @version $Revision$ @@ -356,7 +355,7 @@ public class JmsConfiguration implements } public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) throws Exception { - AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation(); + AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation(endpoint); configureMessageListenerContainer(container, endpoint); return container; } @@ -960,12 +959,13 @@ public class JmsConfiguration implements } } - public AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() { + public AbstractMessageListenerContainer chooseMessageListenerContainerImplementation(JmsEndpoint endpoint) { switch (consumerType) { case Simple: + // TODO: simple is @deprecated and should be removed in Camel 2.7 when we upgrade to Spring 3 return new SimpleMessageListenerContainer(); case Default: - return new DefaultMessageListenerContainer(); + return new JmsMessageListenerContainer(endpoint); default: throw new IllegalArgumentException("Unknown consumer type: " + consumerType); } @@ -985,6 +985,7 @@ public class JmsConfiguration implements * @return the cache level */ protected int defaultCacheLevel(JmsEndpoint endpoint) { + // TODO: upgrade to Spring 3 // if we are on a new enough spring version we can assume CACHE_CONSUMER if (PackageHelper.isValidVersion("org.springframework.jms", 2.51D)) { return DefaultMessageListenerContainer.CACHE_CONSUMER; Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Jan 28 17:06:59 2011 @@ -19,6 +19,7 @@ package org.apache.camel.component.jms; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.ExceptionListener; @@ -72,6 +73,7 @@ public class JmsEndpoint extends Default private ReplyManager replyManager; // scheduled executor to check for timeout (reply not received) private ScheduledExecutorService replyManagerExecutorService; + private final AtomicBoolean running = new AtomicBoolean(); public JmsEndpoint() { this(null, null); @@ -373,10 +375,22 @@ public class JmsEndpoint extends Default return replyManagerExecutorService; } - public void start() throws Exception { + /** + * State whether this endpoint is running (eg started) + */ + protected boolean isRunning() { + return running.get(); } - public void stop() throws Exception { + @Override + protected void doStart() throws Exception { + running.set(true); + } + + @Override + protected void doStop() throws Exception { + running.set(false); + if (replyManager != null) { ServiceHelper.stopService(replyManager); replyManager = null; Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java?rev=1064780&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java (added) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java Fri Jan 28 17:06:59 2011 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import org.springframework.jms.listener.DefaultMessageListenerContainer; + +/** + * The {@link DefaultMessageListenerContainer container} which listen for messages + * on the JMS destination. + * <p/> + * This implementation extends Springs {@link DefaultMessageListenerContainer} supporting + * automatic recovery and throttling. + * + * @version $Revision$ + */ +public class JmsMessageListenerContainer extends DefaultMessageListenerContainer { + + private final JmsEndpoint endpoint; + + public JmsMessageListenerContainer(JmsEndpoint endpoint) { + this.endpoint = endpoint; + } + + @Override + protected boolean runningAllowed() { + // do not run if we have been stopped + return endpoint.isRunning(); + } +} Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Fri Jan 28 17:06:59 2011 @@ -79,8 +79,7 @@ public class NettyEndpoint extends Defau } @Override - public void start() throws Exception { - super.start(); + protected void doStart() throws Exception { ObjectHelper.notNull(timer, "timer"); } Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java (original) +++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java Fri Jan 28 17:06:59 2011 @@ -240,17 +240,21 @@ public class QuartzEndpoint extends Defa return new JobDetail(); } - public void start() throws Exception { + @Override + protected void doStart() throws Exception { ObjectHelper.notNull(getComponent(), "QuartzComponent", this); ServiceHelper.startService(loadBalancer); } - public void stop() throws Exception { + @Override + protected void doStop() throws Exception { ServiceHelper.stopService(loadBalancer); } - public void shutdown() throws Exception { + @Override + protected void doShutdown() throws Exception { ObjectHelper.notNull(trigger, "trigger"); deleteTrigger(getTrigger()); } + } Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java Fri Jan 28 17:06:59 2011 @@ -27,6 +27,7 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.quickfixj.converter.QuickfixjConverters; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,7 @@ public class QuickfixjEndpoint extends D private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEndpoint.class); private SessionID sessionID; - private List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>(); + private final List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>(); public QuickfixjEndpoint(String uri, CamelContext context) { super(uri, context); @@ -59,8 +60,6 @@ public class QuickfixjEndpoint extends D public Consumer createConsumer(Processor processor) throws Exception { LOG.info("Creating QuickFIX/J consumer: " + (sessionID != null ? sessionID : "No Session")); QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor); - // TODO The lifecycle mgmt requirements aren't clear to me - consumer.start(); consumers.add(consumer); return consumer; } @@ -71,7 +70,6 @@ public class QuickfixjEndpoint extends D } public boolean isSingleton() { - // TODO This seems to be incorrect. There can be multiple consumers for a session endpoint. return true; } @@ -90,4 +88,10 @@ public class QuickfixjEndpoint extends D public boolean isMultipleConsumersSupported() { return true; } + + @Override + protected void doStop() throws Exception { + // clear list of consumers + consumers.clear(); + } } Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java Fri Jan 28 17:06:59 2011 @@ -37,6 +37,7 @@ import org.apache.camel.component.quickf import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.impl.converter.StaticMethodTypeConverter; +import org.apache.camel.util.ServiceHelper; import org.apache.mina.common.TransportType; import org.junit.After; import org.junit.Before; @@ -224,7 +225,8 @@ public class QuickfixjComponentTest { } } }); - + ServiceHelper.startService(consumer); + // Endpoint automatically starts the consumer assertThat(((ServiceSupport)consumer).isStarted(), is(true)); @@ -268,7 +270,7 @@ public class QuickfixjComponentTest { final CountDownLatch logonLatch = new CountDownLatch(2); final CountDownLatch messageLatch = new CountDownLatch(2); - endpoint.createConsumer(new Processor() { + Consumer consumer = endpoint.createConsumer(new Processor() { public void process(Exchange exchange) throws Exception { QuickfixjEventCategory eventCategory = (QuickfixjEventCategory) exchange.getIn().getHeader(QuickfixjEndpoint.EVENT_CATEGORY_KEY); @@ -279,7 +281,8 @@ public class QuickfixjComponentTest { } } }); - + ServiceHelper.startService(consumer); + component.start(); assertTrue("Session not created", logonLatch.await(5000, TimeUnit.MILLISECONDS)); Modified: camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java (original) +++ camel/trunk/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java Fri Jan 28 17:06:59 2011 @@ -23,7 +23,6 @@ import org.apache.camel.Consumer; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.Service; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; @@ -34,7 +33,7 @@ import org.restlet.data.Method; * * @version $Revision$ */ -public class RestletEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, Service { +public class RestletEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { private static final int DEFAULT_PORT = 80; private static final String DEFAULT_PROTOCOL = "http"; @@ -179,7 +178,16 @@ public class RestletEndpoint extends Def return restletUriPatterns; } - public void start() throws Exception { + public boolean isThrowExceptionOnFailure() { + return throwExceptionOnFailure; + } + + public void setThrowExceptionOnFailure(boolean throwExceptionOnFailure) { + this.throwExceptionOnFailure = throwExceptionOnFailure; + } + + @Override + protected void doStart() throws Exception { if (headerFilterStrategy == null) { headerFilterStrategy = new RestletHeaderFilterStrategy(); } @@ -191,15 +199,9 @@ public class RestletEndpoint extends Def } } - public void stop() throws Exception { + @Override + protected void doStop() throws Exception { // noop } - public boolean isThrowExceptionOnFailure() { - return throwExceptionOnFailure; - } - - public void setThrowExceptionOnFailure(boolean throwExceptionOnFailure) { - this.throwExceptionOnFailure = throwExceptionOnFailure; - } } Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java?rev=1064780&r1=1064779&r2=1064780&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java (original) +++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/test/TestEndpoint.java Fri Jan 28 17:06:59 2011 @@ -45,7 +45,8 @@ public class TestEndpoint extends MockEn this.expectedMessageEndpoint = expectedMessageEndpoint; } - public void start() throws Exception { + @Override + protected void doStart() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Consuming expected messages from: " + expectedMessageEndpoint); } @@ -62,9 +63,6 @@ public class TestEndpoint extends MockEn } expectedBodiesReceived(expectedBodies); } - - public void stop() throws Exception { - } /** * This method allows us to convert or coerce the expected message body into some other type