Author: davsclaus
Date: Fri Jun 25 07:02:27 2010
New Revision: 957823

URL: http://svn.apache.org/viewvc?rev=957823&view=rev
Log:
CAMEL-2855: Introduced StartupListener to have quartz scheduler start when the 
CamelContext have been started, which ensures all routes etc. have been started 
before the scheduler.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java  
 (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java
   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
    
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=957823&r1=957822&r2=957823&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Fri 
Jun 25 07:02:27 2010
@@ -102,6 +102,15 @@ public interface CamelContext extends Se
      */
     boolean hasService(Object object);
 
+    /**
+     * Adds the given listener to be invoked when {...@link CamelContext} have 
just been started.
+     * <p/>
+     * This allows listeners to do any custom work after the routes and other 
services have been started and are running.
+     *
+     * @param listener the listener
+     */
+    void addStartupListener(StartupListener listener);
+
     // Component Management Methods
     //-----------------------------------------------------------------------
 

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java?rev=957823&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java 
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java 
Fri Jun 25 07:02:27 2010
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Allows objects to be notified when {...@link CamelContext} have just been 
started.
+ * <p/>
+ * This can be used to perform any custom work when the entire {...@link 
CamelContext} has been initialized and started.
+ * For example this ensures that all the Camel routes has been started and are 
up and running, before this callback
+ * is being invoked.
+ * <p/>
+ * For example the QuartzComponent leverages this to ensure the Quartz 
scheduler is started late, when all the
+ * Camel routes and services already have been started.
+ *
+ * @version $Revision$
+ */
+public interface StartupListener {
+
+    /**
+     * Callback invoked when the {...@link org.apache.camel.CamelContext} has 
just been started.
+     *
+     * @param context the camel context
+     * @throws Exception can be thrown in case of errors to fail the startup 
process and have the application
+     * fail on startup.
+     */
+    void onCamelContextStarted(CamelContext context) throws Exception;
+}

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=957823&r1=957822&r2=957823&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
 Fri Jun 25 07:02:27 2010
@@ -24,8 +24,10 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,6 +55,7 @@ import org.apache.camel.Service;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.ShutdownRoute;
 import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.StartupListener;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.component.properties.PropertiesComponent;
@@ -123,6 +126,7 @@ public class DefaultCamelContext extends
     private final Map<String, Component> components = new HashMap<String, 
Component>();
     private List<Route> routes;
     private final List<Service> servicesToClose = new ArrayList<Service>();
+    private final Set<StartupListener> startupListeners = new 
LinkedHashSet<StartupListener>();
     private TypeConverter typeConverter;
     private TypeConverterRegistry typeConverterRegistry;
     private Injector injector;
@@ -136,7 +140,7 @@ public class DefaultCamelContext extends
     private AtomicBoolean managementStrategyInitialized = new 
AtomicBoolean(false);
     private final List<RouteDefinition> routeDefinitions = new 
ArrayList<RouteDefinition>();
     private List<InterceptStrategy> interceptStrategies = new 
ArrayList<InterceptStrategy>();
-    
+
     private boolean firstStartDone;
     private Boolean autoStartup = Boolean.TRUE;
     private Boolean trace = Boolean.FALSE;
@@ -608,7 +612,7 @@ public class DefaultCamelContext extends
         }
         startServices(object);
     }
-   
+
     public boolean hasService(Object object) {
         if (object instanceof Service) {
             return servicesToClose.contains(object);
@@ -616,6 +620,10 @@ public class DefaultCamelContext extends
         return false;
     }
 
+    public void addStartupListener(StartupListener listener) {
+        startupListeners.add(listener);
+    }
+
     // Helper methods
     // -----------------------------------------------------------------------
 
@@ -864,7 +872,7 @@ public class DefaultCamelContext extends
         answer.setMaximumCacheSize(maximumCacheSize);
         // start it so its ready to use
         try {
-            answer.start();
+            startServices(answer);
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
         }
@@ -881,7 +889,7 @@ public class DefaultCamelContext extends
         answer.setMaximumCacheSize(maximumCacheSize);
         // start it so its ready to use
         try {
-            answer.start();
+            startServices(answer);
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
         }
@@ -939,6 +947,11 @@ public class DefaultCamelContext extends
             }
         }
 
+        // now notify any startup aware listeners as all the routes etc has 
been started.
+        for (StartupListener startup : startupListeners) {
+            startup.onCamelContextStarted(this);
+        }
+
         stopWatch.stop();
         if (LOG.isInfoEnabled()) {
             LOG.info("Started " + getRoutes().size() + " routes");
@@ -1079,10 +1092,11 @@ public class DefaultCamelContext extends
         // shutdown services as late as possible
         shutdownServices(servicesToClose);
         servicesToClose.clear();
+        startupListeners.clear();
 
         // must notify that we are stopped before stopping the management 
strategy
         EventHelper.notifyCamelContextStopped(this);
-        
+
         // stop the notifier service
         for (EventNotifier notifier : 
getManagementStrategy().getEventNotifiers()) {
             shutdownServices(notifier);
@@ -1133,6 +1147,20 @@ public class DefaultCamelContext extends
     }
 
     private void startServices(Object service) throws Exception {
+        // it can be a collection so ensure we look inside it
+        if (service instanceof Collection) {
+            for (Object element : (Collection)service) {
+                startServices(element);
+            }
+        }
+
+        // and register startup aware so they can be notified when
+        // camel context has been started
+        if (service instanceof StartupListener) {
+            startupListeners.add((StartupListener) service);
+        }
+
+        // and then start the service
         ServiceHelper.startService(service);
     }
 
@@ -1301,7 +1329,7 @@ public class DefaultCamelContext extends
                 for (LifecycleStrategy strategy : lifecycleStrategies) {
                     strategy.onServiceAdd(this, consumer, route);
                 }
-                ServiceHelper.startService(consumer);
+                startServices(consumer);
 
                 routeInputs.add(endpoint);
 
@@ -1664,7 +1692,7 @@ public class DefaultCamelContext extends
 
     /**
      * Reset CONTEXT_COUNTER to a preset value. Mostly used for tests to 
ensure a predictable getName()
-     * 
+     *
      * @param value new value for the CONTEXT_COUNTER
      */
     public static void setContextCounter(int value) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=957823&r1=957822&r2=957823&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
 Fri Jun 25 07:02:27 2010
@@ -132,14 +132,6 @@ public class RoutingSlip extends Service
         return doRoutingSlip(exchange, routingSlip, callback);
     }
 
-    public boolean doRoutingSlip(Exchange exchange, Object routingSlip) {
-        return doRoutingSlip(exchange, routingSlip, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                // noop
-            }
-        });
-    }
-
     public boolean doRoutingSlip(Exchange exchange, Object routingSlip, 
AsyncCallback callback) {
         Iterator<Object> iter = ObjectHelper.createIterator(routingSlip, 
uriDelimiter);
         Exchange current = exchange;

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java?rev=957823&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java
 Fri Jun 25 07:02:27 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.StartupListener;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.direct.DirectComponent;
+
+/**
+ * @version $Revision$
+ */
+public class StartupListenerComponentFromRegistryTest extends 
ContextTestSupport {
+
+    private MyComponent my;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        my = new MyComponent();
+
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("my", my);
+        return jndi;
+    }
+
+    public void testStartupListenerComponent() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1, my.getInvoked());
+    }
+
+    private class MyComponent extends DirectComponent implements 
StartupListener {
+
+        private int invoked;
+
+        public void onCamelContextStarted(CamelContext context) throws 
Exception {
+            invoked++;
+
+            // the routes should have been started
+            assertTrue(context.getRouteStatus("foo").isStarted());
+            assertTrue(context.getRouteStatus("bar").isStarted());
+        }
+
+        public int getInvoked() {
+            return invoked;
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:foo").routeId("foo").to("my:bar");
+                from("my:bar").routeId("bar").to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentFromRegistryTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java?rev=957823&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java
 Fri Jun 25 07:02:27 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.StartupListener;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.direct.DirectComponent;
+
+/**
+ * @version $Revision$
+ */
+public class StartupListenerComponentTest extends ContextTestSupport {
+
+    private MyComponent my;
+
+    public void testStartupListenerComponent() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1, my.getInvoked());
+    }
+
+    private class MyComponent extends DirectComponent implements 
StartupListener {
+
+        private int invoked;
+
+        public void onCamelContextStarted(CamelContext context) throws 
Exception {
+            invoked++;
+
+            // the routes should have been started
+            assertTrue(context.getRouteStatus("foo").isStarted());
+            assertTrue(context.getRouteStatus("bar").isStarted());
+        }
+
+        public int getInvoked() {
+            return invoked;
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                my = new MyComponent();
+                context.addComponent("my", my);
+
+                from("direct:foo").routeId("foo").to("my:bar");
+                from("my:bar").routeId("bar").to("mock:result");
+            }
+        };
+    }
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerComponentTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java?rev=957823&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java
 Fri Jun 25 07:02:27 2010
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.StartupListener;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class StartupListenerTest extends ContextTestSupport {
+
+    private MyStartupListener my = new MyStartupListener();
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.addStartupListener(my);
+        return context;
+    }
+
+    private class MyStartupListener implements StartupListener {
+
+        private int invoked;
+
+        public void onCamelContextStarted(CamelContext context) throws 
Exception {
+            invoked++;
+
+            // the route should have been started
+            assertTrue(context.getRouteStatus("foo").isStarted());
+        }
+
+        public int getInvoked() {
+            return invoked;
+        }
+    }
+
+    public void testStartupListenerComponent() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1, my.getInvoked());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:foo").routeId("foo").to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartupListenerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=957823&r1=957822&r2=957823&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
 Fri Jun 25 07:02:27 2010
@@ -24,22 +24,20 @@ import java.util.Map;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
-import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Message;
 import org.apache.camel.component.http.HttpMethods;
 import org.apache.camel.component.http.helper.HttpProducerHelper;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
 import org.eclipse.jetty.io.ByteArrayBuffer;
 
 /**
@@ -61,74 +59,28 @@ public class JettyHttpProducer extends D
     }
 
     public void process(Exchange exchange) throws Exception {
-        HttpClient client = getEndpoint().getClient();
-
-        JettyContentExchange httpExchange = createHttpExchange(exchange);
-        sendSynchronous(exchange, client, httpExchange);
+        AsyncProcessorHelper.process(this, exchange);
     }
 
     public boolean process(Exchange exchange, final AsyncCallback callback) {
         HttpClient client = getEndpoint().getClient();
 
         try {
-            JettyContentExchange httpExchange = createHttpExchange(exchange);
-            sendAsynchronous(exchange, client, httpExchange, callback);
+            JettyContentExchange httpExchange = createHttpExchange(exchange, 
callback);
+            doSendExchange(client, httpExchange);
         } catch (Exception e) {
+            // error occurred before we had a chance to go async
+            // so set exception and invoke callback true
             exchange.setException(e);
+            callback.done(true);
+            return true;
         }
 
         // we should continue processing this asynchronously
         return false;
     }
 
-    protected void sendAsynchronous(final Exchange exchange, final HttpClient 
client, final JettyContentExchange httpExchange,
-                                    final AsyncCallback callback) throws 
IOException {
-
-        // set the callback for the async mode
-        httpExchange.setCallback(callback);
-
-        doSendExchange(client, httpExchange);
-
-        // the callback will handle all the response handling logic
-    }
-
-    protected void sendSynchronous(Exchange exchange, HttpClient client, 
JettyContentExchange httpExchange) throws Exception {
-        doSendExchange(client, httpExchange);
-
-        // we send synchronous so wait for it to be done
-        // must use our own lock detection as Jettys waitForDone will wait 
forever in case of connection issues
-        try {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Waiting for HTTP exchange to be done");
-            }
-            int exchangeState = httpExchange.waitForDoneOrFailure();
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("HTTP exchange is done with state " + exchangeState);
-            }
-            if (exchangeState == HttpExchange.STATUS_COMPLETED) {
-                // process the response as the state is ok
-                getBinding().populateResponse(exchange, httpExchange);
-            } else if (exchangeState == HttpExchange.STATUS_EXPIRED) {
-                // we did timeout
-                throw new ExchangeTimedOutException(exchange, 
client.getTimeout());
-            } else {
-                // some kind of other error
-                if (exchange.getException() != null) {
-                    throw exchange.getException();
-                } else {
-                    exchange.setException(new 
CamelExchangeException("JettyClient failed with state " + exchangeState, 
exchange));
-                }
-            }
-        } catch (InterruptedException e) {
-            // are we shutting down?
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Interrupted waiting for async reply, are we 
stopping? " + (isStopping() || isStopped()));
-            }
-            exchange.setException(e);
-        }
-    }
-
-    protected JettyContentExchange createHttpExchange(Exchange exchange) 
throws Exception {
+    protected JettyContentExchange createHttpExchange(Exchange exchange, 
AsyncCallback callback) throws Exception {
         String url = HttpProducerHelper.createURL(exchange, getEndpoint());
         HttpMethods methodToUse = HttpProducerHelper.createMethod(exchange, 
getEndpoint(), exchange.getIn().getBody() != null);
         String method = methodToUse.createMethod(url).getName();
@@ -176,6 +128,8 @@ public class JettyHttpProducer extends D
             }
         }
 
+        // set the callback, which will handle all the response logic
+        httpExchange.setCallback(callback);
         return httpExchange;
     }
 

Modified: 
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java?rev=957823&r1=957822&r2=957823&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
 (original)
+++ 
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
 Fri Jun 25 07:02:27 2010
@@ -26,6 +26,7 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.StartupListener;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
@@ -48,7 +49,7 @@ import org.quartz.impl.StdSchedulerFacto
  *
  * @version $Revision:520964 $
  */
-public class QuartzComponent extends DefaultComponent {
+public class QuartzComponent extends DefaultComponent implements 
StartupListener {
     private static final transient Log LOG = 
LogFactory.getLog(QuartzComponent.class);
     private static final AtomicInteger JOBS = new AtomicInteger();
     private static Scheduler scheduler;
@@ -122,6 +123,19 @@ public class QuartzComponent extends Def
         return cron;
     }
 
+    public void onCamelContextStarted(CamelContext camelContext) throws 
Exception {
+        // only start scheduler when CamelContext have finished starting
+        if (!scheduler.isStarted()) {
+            if (getStartDelayedSeconds() > 0) {
+                LOG.info("Starting Quartz scheduler: " + 
scheduler.getSchedulerName() + " delayed: " + getStartDelayedSeconds() + " 
seconds.");
+                scheduler.startDelayed(getStartDelayedSeconds());
+            } else {
+                LOG.info("Starting Quartz scheduler: " + 
scheduler.getSchedulerName());
+                scheduler.start();
+            }
+        }
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -135,7 +149,7 @@ public class QuartzComponent extends Def
         if (scheduler != null) {
             int number = JOBS.get();
             if (number > 0) {
-                LOG.info("There are still " + number + " jobs registered in 
the Quartz scheduler: " + scheduler.getSchedulerName());
+                LOG.info("Cannot shutdown Quartz scheduler: " + 
scheduler.getSchedulerName() + " as there are still " + number + " jobs 
registered.");
             }
             if (number == 0) {
                 // no more jobs then shutdown the scheduler
@@ -161,7 +175,6 @@ public class QuartzComponent extends Def
             }
             getScheduler().resumeTrigger(trigger.getName(), 
trigger.getGroup());
         }
-
     }
 
     public void removeJob(JobDetail job, Trigger trigger) throws 
SchedulerException {
@@ -201,15 +214,6 @@ public class QuartzComponent extends Def
         if (scheduler == null) {
             scheduler = createScheduler();
         }
-        if (!scheduler.isStarted()) {
-            if (getStartDelayedSeconds() > 0) {
-                LOG.info("Starting Quartz scheduler: " + 
scheduler.getSchedulerName() + " delayed: " + getStartDelayedSeconds() + " 
seconds.");
-                scheduler.startDelayed(getStartDelayedSeconds());
-            } else {
-                LOG.info("Starting Quartz scheduler: " + 
scheduler.getSchedulerName());
-                scheduler.start();
-            }
-        }
         return scheduler;
     }
 


Reply via email to