Author: davsclaus Date: Fri Jun 25 07:02:27 2010 New Revision: 957823 URL: http://svn.apache.org/viewvc?rev=957823&view=rev Log: CAMEL-2855: Introduced StartupListener to have quartz scheduler start when the CamelContext have been started, which ensures all routes etc. have been started before the scheduler.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=957823&r1=957822&r2=957823&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Fri Jun 25 07:02:27 2010 @@ -102,6 +102,15 @@ public interface CamelContext extends Se */ boolean hasService(Object object); + /** + * Adds the given listener to be invoked when {...@link CamelContext} have just been started. + * <p/> + * This allows listeners to do any custom work after the routes and other services have been started and are running. + * + * @param listener the listener + */ + void addStartupListener(StartupListener listener); + // Component Management Methods //----------------------------------------------------------------------- Added: camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java?rev=957823&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java Fri Jun 25 07:02:27 2010 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +/** + * Allows objects to be notified when {...@link CamelContext} have just been started. + * <p/> + * This can be used to perform any custom work when the entire {...@link CamelContext} has been initialized and started. + * For example this ensures that all the Camel routes has been started and are up and running, before this callback + * is being invoked. + * <p/> + * For example the QuartzComponent leverages this to ensure the Quartz scheduler is started late, when all the + * Camel routes and services already have been started. + * + * @version $Revision$ + */ +public interface StartupListener { + + /** + * Callback invoked when the {...@link org.apache.camel.CamelContext} has just been started. + * + * @param context the camel context + * @throws Exception can be thrown in case of errors to fail the startup process and have the application + * fail on startup. + */ + void onCamelContextStarted(CamelContext context) throws Exception; +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=957823&r1=957822&r2=957823&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri Jun 25 07:02:27 2010 @@ -24,8 +24,10 @@ import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,6 +55,7 @@ import org.apache.camel.Service; import org.apache.camel.ServiceStatus; import org.apache.camel.ShutdownRoute; import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.StartupListener; import org.apache.camel.TypeConverter; import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.component.properties.PropertiesComponent; @@ -123,6 +126,7 @@ public class DefaultCamelContext extends private final Map<String, Component> components = new HashMap<String, Component>(); private List<Route> routes; private final List<Service> servicesToClose = new ArrayList<Service>(); + private final Set<StartupListener> startupListeners = new LinkedHashSet<StartupListener>(); private TypeConverter typeConverter; private TypeConverterRegistry typeConverterRegistry; private Injector injector; @@ -136,7 +140,7 @@ public class DefaultCamelContext extends private AtomicBoolean managementStrategyInitialized = new AtomicBoolean(false); private final List<RouteDefinition> routeDefinitions = new ArrayList<RouteDefinition>(); private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); - + private boolean firstStartDone; private Boolean autoStartup = Boolean.TRUE; private Boolean trace = Boolean.FALSE; @@ -608,7 +612,7 @@ public class DefaultCamelContext extends } startServices(object); } - + public boolean hasService(Object object) { if (object instanceof Service) { return servicesToClose.contains(object); @@ -616,6 +620,10 @@ public class DefaultCamelContext extends return false; } + public void addStartupListener(StartupListener listener) { + startupListeners.add(listener); + } + // Helper methods // ----------------------------------------------------------------------- @@ -864,7 +872,7 @@ public class DefaultCamelContext extends answer.setMaximumCacheSize(maximumCacheSize); // start it so its ready to use try { - answer.start(); + startServices(answer); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -881,7 +889,7 @@ public class DefaultCamelContext extends answer.setMaximumCacheSize(maximumCacheSize); // start it so its ready to use try { - answer.start(); + startServices(answer); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -939,6 +947,11 @@ public class DefaultCamelContext extends } } + // now notify any startup aware listeners as all the routes etc has been started. + for (StartupListener startup : startupListeners) { + startup.onCamelContextStarted(this); + } + stopWatch.stop(); if (LOG.isInfoEnabled()) { LOG.info("Started " + getRoutes().size() + " routes"); @@ -1079,10 +1092,11 @@ public class DefaultCamelContext extends // shutdown services as late as possible shutdownServices(servicesToClose); servicesToClose.clear(); + startupListeners.clear(); // must notify that we are stopped before stopping the management strategy EventHelper.notifyCamelContextStopped(this); - + // stop the notifier service for (EventNotifier notifier : getManagementStrategy().getEventNotifiers()) { shutdownServices(notifier); @@ -1133,6 +1147,20 @@ public class DefaultCamelContext extends } private void startServices(Object service) throws Exception { + // it can be a collection so ensure we look inside it + if (service instanceof Collection) { + for (Object element : (Collection)service) { + startServices(element); + } + } + + // and register startup aware so they can be notified when + // camel context has been started + if (service instanceof StartupListener) { + startupListeners.add((StartupListener) service); + } + + // and then start the service ServiceHelper.startService(service); } @@ -1301,7 +1329,7 @@ public class DefaultCamelContext extends for (LifecycleStrategy strategy : lifecycleStrategies) { strategy.onServiceAdd(this, consumer, route); } - ServiceHelper.startService(consumer); + startServices(consumer); routeInputs.add(endpoint); @@ -1664,7 +1692,7 @@ public class DefaultCamelContext extends /** * Reset CONTEXT_COUNTER to a preset value. Mostly used for tests to ensure a predictable getName() - * + * * @param value new value for the CONTEXT_COUNTER */ public static void setContextCounter(int value) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=957823&r1=957822&r2=957823&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Fri Jun 25 07:02:27 2010 @@ -132,14 +132,6 @@ public class RoutingSlip extends Service return doRoutingSlip(exchange, routingSlip, callback); } - public boolean doRoutingSlip(Exchange exchange, Object routingSlip) { - return doRoutingSlip(exchange, routingSlip, new AsyncCallback() { - public void done(boolean doneSync) { - // noop - } - }); - } - public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) { Iterator<Object> iter = ObjectHelper.createIterator(routingSlip, uriDelimiter); Exchange current = exchange; Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java?rev=957823&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java Fri Jun 25 07:02:27 2010 @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.StartupListener; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.direct.DirectComponent; + +/** + * @version $Revision$ + */ +public class StartupListenerComponentFromRegistryTest extends ContextTestSupport { + + private MyComponent my; + + @Override + protected JndiRegistry createRegistry() throws Exception { + my = new MyComponent(); + + JndiRegistry jndi = super.createRegistry(); + jndi.bind("my", my); + return jndi; + } + + public void testStartupListenerComponent() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + + assertEquals(1, my.getInvoked()); + } + + private class MyComponent extends DirectComponent implements StartupListener { + + private int invoked; + + public void onCamelContextStarted(CamelContext context) throws Exception { + invoked++; + + // the routes should have been started + assertTrue(context.getRouteStatus("foo").isStarted()); + assertTrue(context.getRouteStatus("bar").isStarted()); + } + + public int getInvoked() { + return invoked; + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:foo").routeId("foo").to("my:bar"); + from("my:bar").routeId("bar").to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java?rev=957823&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java Fri Jun 25 07:02:27 2010 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.StartupListener; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.direct.DirectComponent; + +/** + * @version $Revision$ + */ +public class StartupListenerComponentTest extends ContextTestSupport { + + private MyComponent my; + + public void testStartupListenerComponent() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + + assertEquals(1, my.getInvoked()); + } + + private class MyComponent extends DirectComponent implements StartupListener { + + private int invoked; + + public void onCamelContextStarted(CamelContext context) throws Exception { + invoked++; + + // the routes should have been started + assertTrue(context.getRouteStatus("foo").isStarted()); + assertTrue(context.getRouteStatus("bar").isStarted()); + } + + public int getInvoked() { + return invoked; + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + my = new MyComponent(); + context.addComponent("my", my); + + from("direct:foo").routeId("foo").to("my:bar"); + from("my:bar").routeId("bar").to("mock:result"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java?rev=957823&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java Fri Jun 25 07:02:27 2010 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.StartupListener; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class StartupListenerTest extends ContextTestSupport { + + private MyStartupListener my = new MyStartupListener(); + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.addStartupListener(my); + return context; + } + + private class MyStartupListener implements StartupListener { + + private int invoked; + + public void onCamelContextStarted(CamelContext context) throws Exception { + invoked++; + + // the route should have been started + assertTrue(context.getRouteStatus("foo").isStarted()); + } + + public int getInvoked() { + return invoked; + } + } + + public void testStartupListenerComponent() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + + assertEquals(1, my.getInvoked()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:foo").routeId("foo").to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=957823&r1=957822&r2=957823&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java Fri Jun 25 07:02:27 2010 @@ -24,22 +24,20 @@ import java.util.Map; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; -import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.Message; import org.apache.camel.component.http.HttpMethods; import org.apache.camel.component.http.helper.HttpProducerHelper; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.io.ByteArrayBuffer; /** @@ -61,74 +59,28 @@ public class JettyHttpProducer extends D } public void process(Exchange exchange) throws Exception { - HttpClient client = getEndpoint().getClient(); - - JettyContentExchange httpExchange = createHttpExchange(exchange); - sendSynchronous(exchange, client, httpExchange); + AsyncProcessorHelper.process(this, exchange); } public boolean process(Exchange exchange, final AsyncCallback callback) { HttpClient client = getEndpoint().getClient(); try { - JettyContentExchange httpExchange = createHttpExchange(exchange); - sendAsynchronous(exchange, client, httpExchange, callback); + JettyContentExchange httpExchange = createHttpExchange(exchange, callback); + doSendExchange(client, httpExchange); } catch (Exception e) { + // error occurred before we had a chance to go async + // so set exception and invoke callback true exchange.setException(e); + callback.done(true); + return true; } // we should continue processing this asynchronously return false; } - protected void sendAsynchronous(final Exchange exchange, final HttpClient client, final JettyContentExchange httpExchange, - final AsyncCallback callback) throws IOException { - - // set the callback for the async mode - httpExchange.setCallback(callback); - - doSendExchange(client, httpExchange); - - // the callback will handle all the response handling logic - } - - protected void sendSynchronous(Exchange exchange, HttpClient client, JettyContentExchange httpExchange) throws Exception { - doSendExchange(client, httpExchange); - - // we send synchronous so wait for it to be done - // must use our own lock detection as Jettys waitForDone will wait forever in case of connection issues - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for HTTP exchange to be done"); - } - int exchangeState = httpExchange.waitForDoneOrFailure(); - if (LOG.isTraceEnabled()) { - LOG.trace("HTTP exchange is done with state " + exchangeState); - } - if (exchangeState == HttpExchange.STATUS_COMPLETED) { - // process the response as the state is ok - getBinding().populateResponse(exchange, httpExchange); - } else if (exchangeState == HttpExchange.STATUS_EXPIRED) { - // we did timeout - throw new ExchangeTimedOutException(exchange, client.getTimeout()); - } else { - // some kind of other error - if (exchange.getException() != null) { - throw exchange.getException(); - } else { - exchange.setException(new CamelExchangeException("JettyClient failed with state " + exchangeState, exchange)); - } - } - } catch (InterruptedException e) { - // are we shutting down? - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupted waiting for async reply, are we stopping? " + (isStopping() || isStopped())); - } - exchange.setException(e); - } - } - - protected JettyContentExchange createHttpExchange(Exchange exchange) throws Exception { + protected JettyContentExchange createHttpExchange(Exchange exchange, AsyncCallback callback) throws Exception { String url = HttpProducerHelper.createURL(exchange, getEndpoint()); HttpMethods methodToUse = HttpProducerHelper.createMethod(exchange, getEndpoint(), exchange.getIn().getBody() != null); String method = methodToUse.createMethod(url).getName(); @@ -176,6 +128,8 @@ public class JettyHttpProducer extends D } } + // set the callback, which will handle all the response logic + httpExchange.setCallback(callback); return httpExchange; } Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java?rev=957823&r1=957822&r2=957823&view=diff ============================================================================== --- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java (original) +++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java Fri Jun 25 07:02:27 2010 @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.CamelContext; +import org.apache.camel.StartupListener; import org.apache.camel.impl.DefaultComponent; import org.apache.camel.util.IntrospectionSupport; import org.apache.camel.util.ObjectHelper; @@ -48,7 +49,7 @@ import org.quartz.impl.StdSchedulerFacto * * @version $Revision:520964 $ */ -public class QuartzComponent extends DefaultComponent { +public class QuartzComponent extends DefaultComponent implements StartupListener { private static final transient Log LOG = LogFactory.getLog(QuartzComponent.class); private static final AtomicInteger JOBS = new AtomicInteger(); private static Scheduler scheduler; @@ -122,6 +123,19 @@ public class QuartzComponent extends Def return cron; } + public void onCamelContextStarted(CamelContext camelContext) throws Exception { + // only start scheduler when CamelContext have finished starting + if (!scheduler.isStarted()) { + if (getStartDelayedSeconds() > 0) { + LOG.info("Starting Quartz scheduler: " + scheduler.getSchedulerName() + " delayed: " + getStartDelayedSeconds() + " seconds."); + scheduler.startDelayed(getStartDelayedSeconds()); + } else { + LOG.info("Starting Quartz scheduler: " + scheduler.getSchedulerName()); + scheduler.start(); + } + } + } + @Override protected void doStart() throws Exception { super.doStart(); @@ -135,7 +149,7 @@ public class QuartzComponent extends Def if (scheduler != null) { int number = JOBS.get(); if (number > 0) { - LOG.info("There are still " + number + " jobs registered in the Quartz scheduler: " + scheduler.getSchedulerName()); + LOG.info("Cannot shutdown Quartz scheduler: " + scheduler.getSchedulerName() + " as there are still " + number + " jobs registered."); } if (number == 0) { // no more jobs then shutdown the scheduler @@ -161,7 +175,6 @@ public class QuartzComponent extends Def } getScheduler().resumeTrigger(trigger.getName(), trigger.getGroup()); } - } public void removeJob(JobDetail job, Trigger trigger) throws SchedulerException { @@ -201,15 +214,6 @@ public class QuartzComponent extends Def if (scheduler == null) { scheduler = createScheduler(); } - if (!scheduler.isStarted()) { - if (getStartDelayedSeconds() > 0) { - LOG.info("Starting Quartz scheduler: " + scheduler.getSchedulerName() + " delayed: " + getStartDelayedSeconds() + " seconds."); - scheduler.startDelayed(getStartDelayedSeconds()); - } else { - LOG.info("Starting Quartz scheduler: " + scheduler.getSchedulerName()); - scheduler.start(); - } - } return scheduler; }