Repository: camel Updated Branches: refs/heads/master d901b59e8 -> 43ca13886
CAMEL-11588: SupervisingRouteController - Routes may be started in wrong order Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0be3d902 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0be3d902 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0be3d902 Branch: refs/heads/master Commit: 0be3d9025dbbc3095e2553190e6740816bed1fdb Parents: d901b59 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Fri Aug 4 16:43:15 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Tue Aug 8 13:29:24 2017 +0200 ---------------------------------------------------------------------- .../camel/impl/SupervisingRouteController.java | 299 ++++++++++++------- 1 file changed, 187 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0be3d902/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java index 1f92653..aaa6796 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java +++ b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java @@ -17,7 +17,6 @@ package org.apache.camel.impl; import java.util.Collection; -import java.util.Comparator; import java.util.EventObject; import java.util.HashMap; import java.util.List; @@ -31,20 +30,25 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Experimental; import org.apache.camel.Route; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.ServiceStatus; import org.apache.camel.StartupListener; import org.apache.camel.management.event.CamelContextStartedEvent; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.HasId; +import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RouteController; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.backoff.BackOff; import org.apache.camel.util.backoff.BackOffContext; import org.apache.camel.util.backoff.BackOffTimer; @@ -63,8 +67,8 @@ public class SupervisingRouteController extends DefaultRouteController { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisingRouteController.class); private final Object lock; private final AtomicBoolean contextStarted; - private final Set<Route> startedRoutes; - private final Set<Route> stoppedRoutes; + private final AtomicInteger routeCount; + private final Set<RouteHolder> routes; private final CamelContextStartupListener listener; private final RouteManager routeManager; private BackOffTimer timer; @@ -73,14 +77,10 @@ public class SupervisingRouteController extends DefaultRouteController { private Map<String, BackOff> backOffConfigurations; public SupervisingRouteController() { - final Comparator<Route> comparator = Comparator.comparing( - route -> Optional.ofNullable(route.getRouteContext().getRoute().getStartupOrder()).orElse(Integer.MIN_VALUE) - ); - this.lock = new Object(); this.contextStarted = new AtomicBoolean(false); - this.stoppedRoutes = new TreeSet<>(comparator); - this.startedRoutes = new TreeSet<>(comparator.reversed()); + this.routeCount = new AtomicInteger(0); + this.routes = new TreeSet<>(); this.routeManager = new RouteManager(); this.defaultBackOff = BackOff.builder().build(); this.backOffConfigurations = new HashMap<>(); @@ -160,96 +160,103 @@ public class SupervisingRouteController extends DefaultRouteController { @Override public void startRoute(String routeId) throws Exception { - final CamelContext context = getCamelContext(); - final Route route = context.getRoute(routeId); + final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst(); - if (route == null) { - return; + if (!route.isPresent()) { + // This route is unknown to this controller, apply default behaviour + // from super class. + super.startRoute(routeId); + } else { + doStartRoute(route.get(), true, r -> super.startRoute(routeId)); } - - doStartRoute(context, route, true, r -> super.startRoute(routeId)); } @Override public void stopRoute(String routeId) throws Exception { - final CamelContext context = getCamelContext(); - final Route route = context.getRoute(routeId); + final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst(); - if (route == null) { - return; + if (!route.isPresent()) { + // This route is unknown to this controller, apply default behaviour + // from super class. + super.stopRoute(routeId); + } else { + doStopRoute(route.get(), true, r -> super.stopRoute(routeId)); } - - doStopRoute(context, route, true, r -> super.stopRoute(routeId)); } @Override public void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { - final CamelContext context = getCamelContext(); - final Route route = context.getRoute(routeId); + final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst(); - if (route == null) { - return; + if (!route.isPresent()) { + // This route is unknown to this controller, apply default behaviour + // from super class. + super.stopRoute(routeId, timeout, timeUnit); + } else { + doStopRoute(route.get(), true, r -> super.stopRoute(r.getId(), timeout, timeUnit)); } - - doStopRoute(context, route, true, r -> super.stopRoute(r.getId(), timeout, timeUnit)); } @Override public boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { - final CamelContext context = getCamelContext(); - final Route route = context.getRoute(routeId); - final AtomicBoolean result = new AtomicBoolean(false); - - if (route == null) { - return false; - } + final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst(); - doStopRoute(context, route, true, r -> result.set(super.stopRoute(r.getId(), timeout, timeUnit, abortAfterTimeout))); + if (!route.isPresent()) { + // This route is unknown to this controller, apply default behaviour + // from super class. + return super.stopRoute(routeId, timeout, timeUnit, abortAfterTimeout); + } else { + final AtomicBoolean result = new AtomicBoolean(false); - return result.get(); + doStopRoute(route.get(), true, r -> result.set(super.stopRoute(r.getId(), timeout, timeUnit, abortAfterTimeout))); + return result.get(); + } } @Override public void suspendRoute(String routeId) throws Exception { - final CamelContext context = getCamelContext(); - final Route route = context.getRoute(routeId); + final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst(); - if (route == null) { - return; + if (!route.isPresent()) { + // This route is unknown to this controller, apply default behaviour + // from super class. + super.suspendRoute(routeId); + } else { + doStopRoute(route.get(), true, r -> super.suspendRoute(r.getId())); } - - doStopRoute(context, route, true, r -> super.suspendRoute(r.getId())); } @Override public void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { - final CamelContext context = getCamelContext(); - final Route route = context.getRoute(routeId); + final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst(); - if (route == null) { - return; + if (!route.isPresent()) { + // This route is unknown to this controller, apply default behaviour + // from super class. + super.suspendRoute(routeId, timeout, timeUnit); + } else { + doStopRoute(route.get(), true, r -> super.suspendRoute(r.getId(), timeout, timeUnit)); } - - doStopRoute(context, route, true, r -> super.suspendRoute(r.getId(), timeout, timeUnit)); } @Override public void resumeRoute(String routeId) throws Exception { - final CamelContext context = getCamelContext(); - final Route route = context.getRoute(routeId); + final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst(); - if (route == null) { - return; + if (!route.isPresent()) { + // This route is unknown to this controller, apply default behaviour + // from super class. + super.resumeRoute(routeId); + } else { + doStartRoute(route.get(), true, r -> super.startRoute(routeId)); } - - doStartRoute(context, route, true, r -> super.startRoute(routeId)); } // ********************************* // Helpers // ********************************* - private void doStopRoute(CamelContext context, Route route, boolean checker, ThrowingConsumer<Route, Exception> consumer) throws Exception { + private void doStopRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder, Exception> consumer) throws Exception { synchronized (lock) { if (checker) { // remove them from checked routes so they don't get started by the @@ -258,7 +265,7 @@ public class SupervisingRouteController extends DefaultRouteController { routeManager.release(route); } - ServiceStatus status = context.getRouteStatus(route.getId()); + ServiceStatus status = route.getStatus(); if (!status.isStoppable()) { LOGGER.debug("Route {} status is {}, skipping", route.getId(), status); return; @@ -266,38 +273,28 @@ public class SupervisingRouteController extends DefaultRouteController { consumer.accept(route); - startedRoutes.remove(route); - stoppedRoutes.add(route); - // Mark the route as un-managed - route.getRouteContext().setRouteController(null); + route.getContext().setRouteController(null); } } - private void doStartRoute(CamelContext context, Route route, boolean checker, ThrowingConsumer<Route, Exception> consumer) throws Exception { + private void doStartRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder, Exception> consumer) throws Exception { synchronized (lock) { - ServiceStatus status = context.getRouteStatus(route.getId()); + ServiceStatus status = route.getStatus(); if (!status.isStartable()) { LOGGER.debug("Route {} status is {}, skipping", route.getId(), status); return; } try { - // remove the route from any queue - stoppedRoutes.remove(route); - startedRoutes.remove(route); - if (checker) { routeManager.release(route); } // Mark the route as managed - route.getRouteContext().setRouteController(this); + route.getContext().setRouteController(this); consumer.accept(route); - - // route started successfully - startedRoutes.add(route); } catch (Exception e) { if (checker) { @@ -316,13 +313,16 @@ public class SupervisingRouteController extends DefaultRouteController { return; } - List<String> routes; + final List<String> routeList; synchronized (lock) { - routes = stoppedRoutes.stream().map(Route::getId).collect(Collectors.toList()); + routeList = routes.stream() + .filter(r -> r.getStatus() == ServiceStatus.Stopped) + .map(RouteHolder::getId) + .collect(Collectors.toList()); } - for (String route: routes) { + for (String route: routeList) { try { startRoute(route); } catch (Exception e) { @@ -332,13 +332,20 @@ public class SupervisingRouteController extends DefaultRouteController { } private synchronized void stopRoutes() { - List<String> routes; + if (!isRunAllowed()) { + return; + } + + final List<String> routeList; synchronized (lock) { - routes = startedRoutes.stream().map(Route::getId).collect(Collectors.toList()); + routeList = routes.stream() + .filter(r -> r.getStatus() == ServiceStatus.Started) + .map(RouteHolder::getId) + .collect(Collectors.toList()); } - for (String route: routes) { + for (String route: routeList) { try { stopRoute(route); } catch (Exception e) { @@ -353,17 +360,15 @@ public class SupervisingRouteController extends DefaultRouteController { private class RouteManager { private final Logger logger; - private final ConcurrentMap<Route, CompletableFuture<BackOffContext>> routes; + private final ConcurrentMap<RouteHolder, CompletableFuture<BackOffContext>> routes; RouteManager() { this.logger = LoggerFactory.getLogger(RouteManager.class); this.routes = new ConcurrentHashMap<>(); } - void start(Route route) { - route.getRouteContext().setRouteController(SupervisingRouteController.this); - - final CamelContext camelContext = getCamelContext(); + void start(RouteHolder route) { + route.getContext().setRouteController(SupervisingRouteController.this); routes.computeIfAbsent( route, @@ -378,7 +383,7 @@ public class SupervisingRouteController extends DefaultRouteController { try { logger.info("Try to restart route: {}", r.getId()); - doStartRoute(camelContext, r, false, rx -> SupervisingRouteController.super.startRoute(rx.getId())); + doStartRoute(r, false, rx -> SupervisingRouteController.super.startRoute(rx.getId())); return false; } catch (Exception e) { return true; @@ -396,17 +401,12 @@ public class SupervisingRouteController extends DefaultRouteController { } synchronized (lock) { - ServiceStatus status = camelContext.getRouteStatus(route.getId()); + final ServiceStatus status = route.getStatus(); if (status.isStopped() || status.isStopping()) { LOGGER.info("Route {} has status {}, stop supervising it", route.getId(), status); - r.getRouteContext().setRouteController(null); - stoppedRoutes.add(r); - } else if (status.isStarted() || status.isStarting()) { - synchronized (lock) { - startedRoutes.add(r); - } + r.getContext().setRouteController(null); } } } @@ -419,7 +419,7 @@ public class SupervisingRouteController extends DefaultRouteController { ); } - boolean release(Route route) { + boolean release(RouteHolder route) { CompletableFuture<BackOffContext> future = routes.remove(route); if (future != null) { future.cancel(true); @@ -433,18 +433,87 @@ public class SupervisingRouteController extends DefaultRouteController { routes.clear(); } - boolean isSupervising(Route route) { + boolean isSupervising(RouteHolder route) { return routes.containsKey(route); } - Collection<Route> routes() { + Collection<RouteHolder> routes() { return routes.keySet(); } } - private boolean isSupervising(Route route) { - synchronized (lock) { - return stoppedRoutes.contains(route) || startedRoutes.contains(route) || routeManager.isSupervising(route); + // ********************************* + // + // ********************************* + + private class RouteHolder implements HasId, Comparable<RouteHolder> { + private final int order; + private final Route route; + + RouteHolder(Route route, int order) { + this.route = route; + this.order = order; + } + + @Override + public String getId() { + return this.route.getId(); + } + + public Route get() { + return this.route; + } + + public RouteContext getContext() { + return this.route.getRouteContext(); + } + + public RouteDefinition getDefinition() { + return this.route.getRouteContext().getRoute(); + } + + public ServiceStatus getStatus() { + return getContext().getCamelContext().getRouteStatus(getId()); + } + + public int getInitializationOrder() { + return order; + } + + public int getStartupOrder() { + Integer order = getDefinition().getStartupOrder(); + if (order == null) { + order = Integer.MAX_VALUE; + } + + return order; + } + + @Override + public int compareTo(RouteHolder o) { + int answer = Integer.compare(getStartupOrder(), o.getStartupOrder()); + if (answer == 0) { + answer = Integer.compare(getInitializationOrder(), o.getInitializationOrder()); + } + + return answer; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + return this.route.equals(((RouteHolder)o).route); + } + + @Override + public int hashCode() { + return route.hashCode(); } } @@ -464,29 +533,35 @@ public class SupervisingRouteController extends DefaultRouteController { private class ManagedRoutePolicy implements RoutePolicy { @Override public void onInit(Route route) { - route.getRouteContext().setRouteController(SupervisingRouteController.this); - route.getRouteContext().getRoute().setAutoStartup("false"); - - if (contextStarted.get()) { - LOGGER.debug("Context is started: add route {} to startable routes", route.getId()); - try { - SupervisingRouteController.this.doStartRoute( - getCamelContext(), - route, - true, - r -> SupervisingRouteController.super.startRoute(r.getId()) - ); - } catch (Exception e) { - e.printStackTrace(); + RouteHolder holder = new RouteHolder(route, routeCount.incrementAndGet()); + if (routes.add(holder)) { + holder.getContext().setRouteController(SupervisingRouteController.this); + holder.getDefinition().setAutoStartup("false"); + + if (contextStarted.get()) { + LOGGER.debug("Context is started: attempt to start route {}", route.getId()); + try { + SupervisingRouteController.this.doStartRoute( + holder, + true, + r -> SupervisingRouteController.super.startRoute(r.getId()) + ); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + } else { + LOGGER.debug("Context is not started: add route {} to stopped routes", holder.getId()); } - } else { - LOGGER.debug("Context is not started: add route {} to stopped routes", route.getId()); - stoppedRoutes.add(route); } } @Override public void onRemove(Route route) { + synchronized (lock) { + routes.removeIf( + r -> ObjectHelper.equal(r.get(), route) || ObjectHelper.equal(r.getId(), route.getId()) + ); + } } @Override