Author: davsclaus Date: Wed Jan 27 20:20:20 2010 New Revision: 903817 URL: http://svn.apache.org/viewvc?rev=903817&view=rev Log: CAMEL-2398: Fixing startup order warmup route services.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java - copied, changed from r903221, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=903817&r1=903816&r2=903817&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Wed Jan 27 20:20:20 2010 @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -141,7 +142,7 @@ private FactoryFinderResolver factoryFinderResolver = new DefaultFactoryFinderResolver(); private FactoryFinder defaultFactoryFinder; private final Map<String, FactoryFinder> factories = new HashMap<String, FactoryFinder>(); - private final Map<String, RouteService> routeServices = new HashMap<String, RouteService>(); + private final Map<String, RouteService> routeServices = new LinkedHashMap<String, RouteService>(); private ClassResolver classResolver = new DefaultClassResolver(); private PackageScanClassResolver packageScanClassResolver; // we use a capacity of 100 per endpoint, so for the same endpoint we have at most 100 producers in the pool @@ -909,14 +910,7 @@ for (RouteService routeService : routeServices.values()) { Boolean autoStart = routeService.getRouteDefinition().isAutoStartup(); if (autoStart == null || autoStart) { - // defer starting inputs till later as we want to prepare the routes by starting - // all their processors and child services etc. - // then later we open the floods to Camel by starting the inputs - // what this does is to ensure Camel is more robust on starting routes as all routes - // will then be prepared in time before we start inputs which will consume messages to be routed - routeService.startInputs(false); try { - routeService.start(); // add the inputs from this route service to the list to start afterwards // should be ordered according to the startup number Integer startupOrder = routeService.getRouteDefinition().getStartupOrder(); @@ -926,14 +920,8 @@ } // create holder object that contains information about this route to be started - DefaultRouteStartupOrder holder = null; - for (Map.Entry<Route, Consumer> entry : routeService.getInputs().entrySet()) { - if (holder == null) { - holder = new DefaultRouteStartupOrder(startupOrder, entry.getKey()); - } - // add the input consumer to the holder - holder.addInput(entry.getValue()); - } + Route route = routeService.getRoutes().iterator().next(); + DefaultRouteStartupOrder holder = new DefaultRouteStartupOrder(startupOrder, route, routeService); // check for clash by startupOrder id DefaultRouteStartupOrder other = inputs.get(startupOrder); @@ -949,8 +937,6 @@ throw e; } catch (Exception e) { throw new FailedToStartRouteException(e); - } finally { - routeService.startInputs(true); } } else { // should not start on startup @@ -958,9 +944,28 @@ } } + // now prepare the routes by starting its services before we start the input + for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { + // defer starting inputs till later as we want to prepare the routes by starting + // all their processors and child services etc. + // then later we open the floods to Camel by starting the inputs + // what this does is to ensure Camel is more robust on starting routes as all routes + // will then be prepared in time before we start inputs which will consume messages to be routed + RouteService routeService = entry.getValue().getRouteService(); + routeService.startInputs(false); + try { + routeService.start(); + } finally { + routeService.startInputs(true); + } + } + // check for clash with multiple consumers of the same endpoints which is not allowed List<Endpoint> routeInputs = new ArrayList<Endpoint>(); - for (RouteService routeService : routeServices.values()) { + for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { + Integer order = entry.getKey(); + Route route = entry.getValue().getRoute(); + RouteService routeService = entry.getValue().getRouteService(); for (Consumer consumer : routeService.getInputs().values()) { Endpoint endpoint = consumer.getEndpoint(); @@ -974,28 +979,20 @@ throw new FailedToStartRouteException(routeService.getId(), "Multiple consumers for the same endpoint is not allowed: " + endpoint); } else { + // start the + if (LOG.isDebugEnabled()) { + LOG.debug("Starting consumer (order: " + order + ") on route: " + route.getId()); + } + for (LifecycleStrategy strategy : lifecycleStrategies) { + strategy.onServiceAdd(this, consumer, route); + } + ServiceHelper.startService(consumer); + routeInputs.add(endpoint); - } - } - } - // now start the inputs for all the route services as we have prepared Camel - // yeah open the floods so messages can start flow into Camel - for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { - Integer order = entry.getKey(); - Route route = entry.getValue().getRoute(); - List<Consumer> consumers = entry.getValue().getInputs(); - for (Consumer consumer : consumers) { - if (LOG.isDebugEnabled()) { - LOG.debug("Starting consumer (order: " + order + ") on route: " + route.getId()); + // add to the order which they was started, so we know how to stop them in reverse order + routeStartupOrder.add(entry.getValue()); } - for (LifecycleStrategy strategy : lifecycleStrategies) { - strategy.onServiceAdd(this, consumer, route); - } - ServiceHelper.startService(consumer); - - // add to the order which they was started, so we know how to stop them in reverse order - routeStartupOrder.add(entry.getValue()); } } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java?rev=903817&r1=903816&r2=903817&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java Wed Jan 27 20:20:20 2010 @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.camel.Consumer; import org.apache.camel.Route; @@ -33,14 +34,12 @@ private final int startupOrder; private final Route route; private final List<Consumer> inputs = new ArrayList<Consumer>(); + private final RouteService routeService; - public DefaultRouteStartupOrder(int startupOrder, Route route) { + public DefaultRouteStartupOrder(int startupOrder, Route route, RouteService routeService) { this.startupOrder = startupOrder; this.route = route; - } - - void addInput(Consumer input) { - inputs.add(input); + this.routeService = routeService; } public int getStartupOrder() { @@ -52,7 +51,16 @@ } public List<Consumer> getInputs() { - return inputs; + List<Consumer> answer = new ArrayList<Consumer>(); + Map<Route, Consumer> inputs = routeService.getInputs(); + for (Consumer consumer : inputs.values()) { + answer.add(consumer); + } + return answer; + } + + public RouteService getRouteService() { + return routeService; } @Override Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java (from r903221, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java&r1=903221&r2=903817&rev=903817&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java Wed Jan 27 20:20:20 2010 @@ -19,6 +19,9 @@ import java.util.List; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Service; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultCamelContext; @@ -27,9 +30,16 @@ /** * @version $Revision$ */ -public class RouteStartupOrderTest extends ContextTestSupport { +public class RouteServicesStartupOrderTest extends ContextTestSupport { - public void testRouteStartupOrder() throws Exception { + private static String startOrder = ""; + + private MyServiceBean service1 = new MyServiceBean("1"); + private MyServiceBean service2 = new MyServiceBean("2"); + private MyServiceBean service3 = new MyServiceBean("3"); + private MyServiceBean service4 = new MyServiceBean("4"); + + public void testRouteServiceStartupOrder() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); @@ -46,6 +56,10 @@ assertEquals("direct://start", order.get(1).getRoute().getEndpoint().getEndpointUri()); assertEquals("seda://bar", order.get(2).getRoute().getEndpoint().getEndpointUri()); assertEquals("direct://bar", order.get(3).getRoute().getEndpoint().getEndpointUri()); + + // assert route service was started in order as well + assertEquals("22114433", startOrder); + } @Override @@ -53,14 +67,48 @@ return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").startupOrder(2).to("seda:foo"); + from("direct:start").startupOrder(2).process(service1).to("seda:foo"); - from("seda:foo").startupOrder(1).to("mock:result"); + from("seda:foo").startupOrder(1).process(service2).to("mock:result"); - from("direct:bar").startupOrder(9).to("seda:bar"); + from("direct:bar").startupOrder(9).process(service3).to("seda:bar"); - from("seda:bar").startupOrder(5).to("mock:other"); + from("seda:bar").startupOrder(5).process(service4).to("mock:other"); } }; } + + public class MyServiceBean implements Processor, Service { + + private String name; + private boolean started; + + public MyServiceBean(String name) { + this.name = name; + } + + public void start() throws Exception { + startOrder += name; + started = true; + } + + public void stop() throws Exception { + started = false; + } + + public boolean isStarted() { + return started; + } + + public String getName() { + return name; + } + + public void setStarted(boolean started) { + this.started = started; + } + + public void process(Exchange exchange) throws Exception { + } + } } \ No newline at end of file