This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 34c028e CAMEL-15035: SupervisingRouteController - JMX management 34c028e is described below commit 34c028e1e514bffe246740e98eda321488c754a3 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon May 11 10:52:20 2020 +0200 CAMEL-15035: SupervisingRouteController - JMX management --- .../src/main/java/org/apache/camel/Route.java | 6 ++ .../camel/spi/ManagementObjectNameStrategy.java | 2 +- .../apache/camel/spi/ManagementObjectStrategy.java | 2 +- .../camel/spi/SupervisingRouteController.java | 17 ++++ .../camel/impl/engine/DefaultRouteController.java | 12 ++- .../engine/DefaultSupervisingRouteController.java | 21 ++++- .../main/MainSupervisingRouteControllerTest.java | 8 ++ .../api/management/mbean/CamelOpenMBeanTypes.java | 13 +++ .../mbean/ManagedRouteControllerMBean.java | 2 +- .../mbean/ManagedRuntimeEndpointRegistryMBean.java | 4 +- .../ManagedSupervisingRouteControllerMBean.java | 13 ++- .../DefaultManagementObjectNameStrategy.java | 11 ++- .../DefaultManagementObjectStrategy.java | 12 ++- .../management/JmxManagementLifecycleStrategy.java | 15 ++-- .../management/mbean/ManagedRouteController.java | 22 ++--- .../mbean/ManagedSupervisingRouteController.java | 95 ++++++++++++++++++++++ .../ManagedSupervisingRouteControllerTest.java | 14 ++-- .../apache/camel/util/backoff/BackOffTimer.java | 5 ++ .../camel/util/backoff/BackOffTimerTask.java | 13 +++ .../camel/util/backoff/BackOffTimerTest.java | 11 ++- 20 files changed, 257 insertions(+), 41 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java b/core/camel-api/src/main/java/org/apache/camel/Route.java index 13586e6..ae4f818 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Route.java +++ b/core/camel-api/src/main/java/org/apache/camel/Route.java @@ -180,8 +180,14 @@ public interface Route extends RuntimeConfiguration { */ void setLastError(RouteError error); + /** + * Gets the route startup order + */ Integer getStartupOrder(); + /** + * Sets the route startup order + */ void setStartupOrder(Integer startupOrder); /** diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java index 1656ff8..e6ccdd08 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java @@ -46,7 +46,7 @@ public interface ManagementObjectNameStrategy { ObjectName getObjectNameForCamelContext(CamelContext context) throws MalformedObjectNameException; - ObjectName getObjectNameForRouteController(CamelContext context) throws MalformedObjectNameException; + ObjectName getObjectNameForRouteController(CamelContext context, RouteController controller) throws MalformedObjectNameException; ObjectName getObjectNameForComponent(Component component, String name) throws MalformedObjectNameException; diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java index 9649519..b906d13 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java @@ -48,7 +48,7 @@ public interface ManagementObjectStrategy { Object getManagedObjectForErrorHandler(CamelContext context, Route route, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder); - Object getManagedObjectForRouteController(CamelContext context); + Object getManagedObjectForRouteController(CamelContext context, RouteController routeController); Object getManagedObjectForRoute(CamelContext context, Route route); diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java b/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java index e0b6228..cb6e6e1 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java @@ -19,6 +19,7 @@ package org.apache.camel.spi; import java.util.Collection; import org.apache.camel.Route; +import org.apache.camel.util.backoff.BackOffTimer; /** * A supervising capable {@link RouteController} that delays the startup @@ -123,4 +124,20 @@ public interface SupervisingRouteController extends RouteController { */ Collection<Route> getRestartingRoutes(); + /** + * Gets the state of the backoff for the given route if its managed and under restarting. + * + * @param routeId the route id + * @return the state, or <tt>null</tt> if the route is not under restarting + */ + BackOffTimer.Task getRestartingRouteState(String routeId); + + /** + * Gets the last exception that caused the route to not startup for the given route + * + * @param routeId the route id + * @return the caused exception + */ + Throwable getRestartException(String routeId); + } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteController.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteController.java index b0a3891..a13e13a 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteController.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteController.java @@ -22,12 +22,22 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.NonManagedService; import org.apache.camel.Route; import org.apache.camel.ServiceStatus; import org.apache.camel.spi.RouteController; import org.apache.camel.support.service.ServiceSupport; -public class DefaultRouteController extends ServiceSupport implements RouteController { +/** + * A default {@link RouteController} that starts the routes in a fail-fast mode, which means + * if any of the routes fail to startup then this causes Camel to fail to startup as well. + * + * @see DefaultSupervisingRouteController + */ +public class DefaultRouteController extends ServiceSupport implements RouteController, NonManagedService { + + // mark this as non managed service as its registered specially as a route controller + private CamelContext camelContext; public DefaultRouteController() { diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java index 9b7ce52..948cd5f 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java @@ -36,6 +36,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.ExtendedStartupListener; import org.apache.camel.FailedToStartRouteException; import org.apache.camel.NamedNode; +import org.apache.camel.NonManagedService; import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; import org.apache.camel.ServiceStatus; @@ -58,6 +59,8 @@ import org.slf4j.LoggerFactory; * of the routes after the camel context startup and takes control of starting the routes in a safe manner. * This controller is able to retry starting failing routes, and have various options to configure * settings for backoff between restarting routes. + * + * @see DefaultRouteController */ public class DefaultSupervisingRouteController extends DefaultRouteController implements SupervisingRouteController { @@ -325,6 +328,16 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im .collect(Collectors.toList()); } + @Override + public BackOffTimer.Task getRestartingRouteState(String routeId) { + return routeManager.getBackOffContext(routeId).orElse(null); + } + + @Override + public Throwable getRestartException(String routeId) { + return routeManager.exceptions.get(routeId); + } + // ********************************* // Helpers // ********************************* @@ -441,10 +454,12 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im private class RouteManager { private final Logger logger; private final ConcurrentMap<RouteHolder, BackOffTimer.Task> routes; + private final ConcurrentMap<String, Throwable> exceptions; RouteManager() { this.logger = LoggerFactory.getLogger(RouteManager.class); this.routes = new ConcurrentHashMap<>(); + this.exceptions = new ConcurrentHashMap<>(); } void start(RouteHolder route) { @@ -465,6 +480,7 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im doStartRoute(r, false, rx -> DefaultSupervisingRouteController.super.startRoute(rx.getId())); return false; } catch (Exception e) { + exceptions.put(r.getId(), e); String cause = e.getClass().getName() + ": " + e.getMessage(); logger.info("Failed restarting route: {} attempt: {} due: {} (stacktrace in debug log level)", r.getId(), attempt, cause); logger.debug(" Error restarting route caused by: " + e.getMessage(), e); @@ -501,6 +517,7 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im } boolean release(RouteHolder route) { + exceptions.remove(route.getId()); BackOffTimer.Task task = routes.remove(route); if (task != null) { LOG.info("Cancel restart task for route {}", route.getId()); @@ -598,7 +615,9 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im } } - private class ManagedRoutePolicy extends RoutePolicySupport { + private class ManagedRoutePolicy extends RoutePolicySupport implements NonManagedService { + + // we dont want this policy to be registed in JMX private void startRoute(RouteHolder holder) { try { diff --git a/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerTest.java b/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerTest.java index b8fbfff..ff88fbd 100644 --- a/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerTest.java +++ b/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerTest.java @@ -27,6 +27,7 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.seda.SedaComponent; import org.apache.camel.component.seda.SedaConsumer; import org.apache.camel.component.seda.SedaEndpoint; +import org.apache.camel.spi.SupervisingRouteController; import org.junit.Assert; import org.junit.Test; @@ -64,6 +65,13 @@ public class MainSupervisingRouteControllerTest extends Assert { assertEquals("Stopped", main.camelContext.getRouteController().getRouteStatus("cheese").toString()); // cake was not able to start assertEquals("Stopped", main.camelContext.getRouteController().getRouteStatus("cake").toString()); + + SupervisingRouteController src = (SupervisingRouteController) main.camelContext.getRouteController(); + Throwable e = src.getRestartException("cake"); + assertNotNull(e); + assertEquals("Cannot start", e.getMessage()); + assertTrue(e instanceof IllegalArgumentException); + // bar is no auto startup assertEquals("Stopped", main.camelContext.getRouteController().getRouteStatus("bar").toString()); diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index a2f555f..192d2c9 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -212,4 +212,17 @@ public final class CamelOpenMBeanTypes { CompositeType ct = camelRoutePropertiesCompositeType(); return new TabularType("routeProperties", "Route Properties", ct, new String[]{"key"}); } + + public static TabularType supervisingRouteControllerRouteStatusTabularType() throws OpenDataException { + CompositeType ct = supervisingRouteControllerRouteStatusCompositeType(); + return new TabularType("routeStatus", "Lists detailed status about all the routes (incl failure details for routes failed to start)", ct, new String[]{"index"}); + } + + public static CompositeType supervisingRouteControllerRouteStatusCompositeType() throws OpenDataException { + return new CompositeType("routes", "Routes", + new String[]{"index", "routeId", "status", "supervising", "attempts", "elapsed", "last", "error", "stacktrace"}, + new String[]{"Index", "Route Id", "Status", "Supervising", "Attempts", "Elapsed", "Since Last Attempt", "Error", "Stacktrace"}, + new OpenType[]{SimpleType.INTEGER, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING}); + } + } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteControllerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteControllerMBean.java index 6b4282a..818765d 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteControllerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteControllerMBean.java @@ -20,7 +20,7 @@ import java.util.Collection; import org.apache.camel.api.management.ManagedAttribute; -public interface ManagedRouteControllerMBean { +public interface ManagedRouteControllerMBean extends ManagedServiceMBean { @ManagedAttribute(description = "Controlled Routes") Collection<String> getControlledRoutes(); diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRuntimeEndpointRegistryMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRuntimeEndpointRegistryMBean.java index 00513d3..f7df80c 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRuntimeEndpointRegistryMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRuntimeEndpointRegistryMBean.java @@ -43,10 +43,10 @@ public interface ManagedRuntimeEndpointRegistryMBean extends ManagedServiceMBean @ManagedAttribute(description = "Number of endpoints currently in the registry") int getSize(); - @ManagedOperation(description = " Gets all the endpoint urls captured during runtime that are in-use") + @ManagedOperation(description = "Gets all the endpoint urls captured during runtime that are in-use") List<String> getAllEndpoints(boolean includeInputs); - @ManagedOperation(description = " Gets all the endpoint urls captured during runtime that are in-use for the given route") + @ManagedOperation(description = "Gets all the endpoint urls captured during runtime that are in-use for the given route") List<String> getEndpointsPerRoute(String routeId, boolean includeInputs); @ManagedOperation(description = "Lists statistics about all the endpoints in the registry") diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java index a5ca7be..563fb9b 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java @@ -16,9 +16,14 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + +import java.util.Collection; + import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; -public interface ManagedSupervisingRouteControllerMBean extends ManagedServiceMBean { +public interface ManagedSupervisingRouteControllerMBean extends ManagedRouteControllerMBean { @ManagedAttribute(description = "Whether supervising is enabled") boolean isEnabled(); @@ -56,4 +61,10 @@ public interface ManagedSupervisingRouteControllerMBean extends ManagedServiceMB @ManagedAttribute(description = "Number of routes which have failed to startup and are currently managed to be restarted") int getNumberOfRestartingRoutes(); + @ManagedAttribute(description = "Controlled Routes") + Collection<String> getRestartingRoutes(); + + @ManagedOperation(description = "Lists detailed status about all the routes (incl failure details for routes failed to start)") + TabularData routeStatus(boolean restartingOnly, boolean includeStacktrace); + } diff --git a/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java index 9c7cb3d..ce56dc6 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java @@ -53,12 +53,14 @@ import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedRouteController; import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedStep; +import org.apache.camel.management.mbean.ManagedSupervisingRouteController; import org.apache.camel.management.mbean.ManagedThreadPool; import org.apache.camel.management.mbean.ManagedTracer; import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.EventNotifier; import org.apache.camel.spi.ManagementObjectNameStrategy; +import org.apache.camel.spi.RouteController; import org.apache.camel.util.InetAddressUtil; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; @@ -131,7 +133,10 @@ public class DefaultManagementObjectNameStrategy implements ManagementObjectName objectName = getObjectNameForCamelHealth(mch.getContext()); } else if (managedObject instanceof ManagedRouteController) { ManagedRouteController mrc = (ManagedRouteController) managedObject; - objectName = getObjectNameForRouteController(mrc.getContext()); + objectName = getObjectNameForRouteController(mrc.getContext(), mrc.getRouteController()); + } else if (managedObject instanceof ManagedSupervisingRouteController) { + ManagedSupervisingRouteController mrc = (ManagedSupervisingRouteController) managedObject; + objectName = getObjectNameForRouteController(mrc.getContext(), mrc.getRouteController()); } else if (managedObject instanceof ManagedComponent) { ManagedComponent mc = (ManagedComponent) managedObject; objectName = getObjectNameForComponent(mc.getComponent(), mc.getComponentName()); @@ -231,7 +236,7 @@ public class DefaultManagementObjectNameStrategy implements ManagementObjectName } @Override - public ObjectName getObjectNameForRouteController(CamelContext context) throws MalformedObjectNameException { + public ObjectName getObjectNameForRouteController(CamelContext context, RouteController routeController) throws MalformedObjectNameException { // prefer to use the given management name if previously assigned String managementName = context.getManagementName(); if (managementName == null) { @@ -242,7 +247,7 @@ public class DefaultManagementObjectNameStrategy implements ManagementObjectName buffer.append(domainName).append(":"); buffer.append(KEY_CONTEXT + "=").append(getContextId(managementName)).append(","); buffer.append(KEY_TYPE + "=" + TYPE_ROUTE_CONTROLLER + ","); - buffer.append(KEY_NAME + "=").append(ObjectName.quote(context.getName())); + buffer.append(KEY_NAME + "=").append(routeController.getClass().getSimpleName()); return createObjectName(buffer); } diff --git a/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java index 5fc59e0..8122314 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java @@ -89,6 +89,7 @@ import org.apache.camel.management.mbean.ManagedSplitter; import org.apache.camel.management.mbean.ManagedStep; import org.apache.camel.management.mbean.ManagedStickyLoadBalancer; import org.apache.camel.management.mbean.ManagedStop; +import org.apache.camel.management.mbean.ManagedSupervisingRouteController; import org.apache.camel.management.mbean.ManagedSuspendableRoute; import org.apache.camel.management.mbean.ManagedThreadPool; import org.apache.camel.management.mbean.ManagedThreads; @@ -174,6 +175,8 @@ import org.apache.camel.spi.BrowsableEndpoint; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.EventNotifier; import org.apache.camel.spi.ManagementObjectStrategy; +import org.apache.camel.spi.RouteController; +import org.apache.camel.spi.SupervisingRouteController; import org.apache.camel.support.ScheduledPollConsumer; import org.apache.camel.support.processor.MarshalProcessor; import org.apache.camel.support.processor.PredicateValidatingProcessor; @@ -243,8 +246,13 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy } @Override - public Object getManagedObjectForRouteController(CamelContext context) { - ManagedRouteController mrc = new ManagedRouteController(context); + public Object getManagedObjectForRouteController(CamelContext context, RouteController routeController) { + ManagedService mrc; + if (routeController instanceof SupervisingRouteController) { + mrc = new ManagedSupervisingRouteController(context, (SupervisingRouteController) routeController); + } else { + mrc = new ManagedRouteController(context, routeController); + } mrc.init(context.getManagementStrategy()); return mrc; } diff --git a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java index bb9d359..715604b 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java @@ -16,6 +16,9 @@ */ package org.apache.camel.management; +import javax.management.JMException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -26,10 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; -import javax.management.JMException; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Channel; @@ -65,7 +64,6 @@ import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry; import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; -import org.apache.camel.management.mbean.ManagedSupervisingRouteController; import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy; import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; import org.apache.camel.management.mbean.ManagedTracer; @@ -99,7 +97,6 @@ import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RestRegistry; import org.apache.camel.spi.RuntimeEndpointRegistry; import org.apache.camel.spi.StreamCachingStrategy; -import org.apache.camel.spi.SupervisingRouteController; import org.apache.camel.spi.Tracer; import org.apache.camel.spi.TransformerRegistry; import org.apache.camel.spi.TypeConverterRegistry; @@ -248,7 +245,7 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li } try { - Object me = getManagementObjectStrategy().getManagedObjectForRouteController(camelContext); + Object me = getManagementObjectStrategy().getManagedObjectForRouteController(camelContext, camelContext.getRouteController()); if (me == null) { // endpoint should not be managed return; @@ -311,7 +308,7 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li } try { - Object mc = getManagementObjectStrategy().getManagedObjectForRouteController(context); + Object mc = getManagementObjectStrategy().getManagedObjectForRouteController(context, context.getRouteController()); // the context could have been removed already if (getManagementStrategy().isManaged(mc)) { unmanageObject(mc); @@ -551,8 +548,6 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li answer = new ManagedValidatorRegistry(context, (ValidatorRegistry)service); } else if (service instanceof CamelClusterService) { answer = getManagementObjectStrategy().getManagedObjectForClusterService(context, (CamelClusterService)service); - } else if (service instanceof SupervisingRouteController) { - answer = new ManagedSupervisingRouteController(context, (SupervisingRouteController) service); } else if (service != null) { // fallback as generic service answer = getManagementObjectStrategy().getManagedObjectForService(context, service); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRouteController.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRouteController.java index c81d3b5..f0c72f4 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRouteController.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRouteController.java @@ -22,29 +22,31 @@ import java.util.stream.Collectors; import org.apache.camel.CamelContext; import org.apache.camel.Route; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedRouteControllerMBean; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.RouteController; -public class ManagedRouteController implements ManagedRouteControllerMBean { - private final CamelContext context; +@ManagedResource(description = "Managed RouteController") +public class ManagedRouteController extends ManagedService implements ManagedRouteControllerMBean { - public ManagedRouteController(CamelContext context) { - this.context = context; + private final RouteController controller; + + public ManagedRouteController(CamelContext context, RouteController controller) { + super(context, controller); + this.controller = controller; } - public void init(ManagementStrategy strategy) { - // do nothing + public RouteController getRouteController() { + return controller; } - public CamelContext getContext() { - return context; + public void init(ManagementStrategy strategy) { + // do nothing } @Override public Collection<String> getControlledRoutes() { - RouteController controller = context.getRouteController(); - if (controller != null) { return controller.getControlledRoutes().stream() .map(Route::getId) diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java index 96dc616..ca80059 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java @@ -16,10 +16,26 @@ */ package org.apache.camel.management.mbean; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; + import org.apache.camel.CamelContext; +import org.apache.camel.Route; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedSupervisingRouteControllerMBean; import org.apache.camel.spi.SupervisingRouteController; +import org.apache.camel.util.TimeUtils; +import org.apache.camel.util.backoff.BackOffTimer; @ManagedResource(description = "Managed SupervisingRouteController") public class ManagedSupervisingRouteController extends ManagedService implements ManagedSupervisingRouteControllerMBean { @@ -31,6 +47,10 @@ public class ManagedSupervisingRouteController extends ManagedService implements this.controller = controller; } + public SupervisingRouteController getRouteController() { + return controller; + } + @Override public boolean isEnabled() { return true; @@ -90,4 +110,79 @@ public class ManagedSupervisingRouteController extends ManagedService implements public int getNumberOfRestartingRoutes() { return controller.getRestartingRoutes().size(); } + + @Override + public Collection<String> getControlledRoutes() { + if (controller != null) { + return controller.getControlledRoutes().stream() + .map(Route::getId) + .collect(Collectors.toList()); + } + + return Collections.emptyList(); + } + + @Override + public Collection<String> getRestartingRoutes() { + if (controller != null) { + return controller.getRestartingRoutes().stream() + .map(Route::getId) + .collect(Collectors.toList()); + } + + return Collections.emptyList(); + } + + @Override + public TabularData routeStatus(boolean restartingOnly, boolean includeStacktrace) { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.supervisingRouteControllerRouteStatusTabularType()); + + int index = 0; + Collection<Route> routes = restartingOnly ? controller.getRestartingRoutes() : controller.getControlledRoutes(); + for (Route route : routes) { + CompositeType ct = CamelOpenMBeanTypes.supervisingRouteControllerRouteStatusCompositeType(); + + String routeId = route.getRouteId(); + String status = controller.getRouteStatus(routeId).name(); + BackOffTimer.Task state = controller.getRestartingRouteState(routeId); + String supervising = state != null ? state.getStatus().name() : ""; + long attempts = state != null ? state.getCurrentAttempts() : 0; + String elapsed = ""; + String last = ""; + long time = state != null ? state.getFirstAttemptTime() : 0; + if (time > 0) { + long delta = System.currentTimeMillis() - time; + elapsed = TimeUtils.printDuration(delta); + } + time = state != null ? state.getLastAttemptTime() : 0; + if (time > 0) { + long delta = System.currentTimeMillis() - time; + last = TimeUtils.printDuration(delta); + } + String error = ""; + String stacktrace = ""; + Throwable cause = controller.getRestartException(routeId); + if (cause != null) { + error = cause.getMessage(); + if (includeStacktrace) { + StringWriter writer = new StringWriter(); + cause.printStackTrace(new PrintWriter(writer)); + writer.flush(); + stacktrace = writer.toString(); + } + } + + CompositeData data = new CompositeDataSupport(ct, new String[]{"index", "routeId", "status", "supervising", "attempts", "elapsed", "last", "error", "stacktrace"}, + new Object[]{index, routeId, status, supervising, attempts, elapsed, last, error, stacktrace}); + answer.put(data); + + // use a counter as the single index in the TabularData as we do not want a multi-value index + index++; + } + return answer; + } catch (Exception e) { + throw RuntimeCamelException.wrapRuntimeCamelException(e); + } + } } diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java index b218bc6..312b8ac 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java @@ -16,11 +16,11 @@ */ package org.apache.camel.management; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import javax.management.MBeanServer; import javax.management.ObjectName; +import javax.management.openmbean.TabularData; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.Consumer; @@ -44,7 +44,7 @@ public class ManagedSupervisingRouteControllerTest extends ManagementTestSupport SupervisingRouteController src = new DefaultSupervisingRouteController(); src.setThreadPoolSize(2); src.setBackOffDelay(250); - src.setBackOffMaxAttempts(3); + src.setBackOffMaxAttempts(100); context.setRouteController(src); return context; } @@ -60,7 +60,7 @@ public class ManagedSupervisingRouteControllerTest extends ManagementTestSupport MBeanServer mbeanServer = getMBeanServer(); // get the object name for the delayer - ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultSupervisingRouteController"); + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=routecontrollers,name=DefaultSupervisingRouteController"); assertTrue(mbeanServer.isRegistered(on)); Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); @@ -79,6 +79,10 @@ public class ManagedSupervisingRouteControllerTest extends ManagementTestSupport Integer restarting = (Integer) mbeanServer.getAttribute(on, "NumberOfRestartingRoutes"); assertEquals(2, restarting.intValue()); }); + + TabularData data = (TabularData) mbeanServer.invoke(on, "routeStatus", new Object[]{true, true}, new String[]{"boolean", "boolean"}); + assertNotNull(data); + assertEquals(2, data.size()); } @Override diff --git a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java index 909591a..e84b403 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java @@ -87,6 +87,11 @@ public class BackOffTimer { long getCurrentElapsedTime(); /** + * The time the first attempt was performed. + */ + long getFirstAttemptTime(); + + /** * The time the last attempt has been performed. */ long getLastAttemptTime(); diff --git a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java index 9564752..d3b58db 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java @@ -34,6 +34,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { private final List<BiConsumer<BackOffTimer.Task, Throwable>> consumers; private Status status; + private long firstAttemptTime; private long currentAttempts; private long currentDelay; private long currentElapsedTime; @@ -48,6 +49,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { this.currentAttempts = 0; this.currentDelay = backOff.getDelay().toMillis(); this.currentElapsedTime = 0; + this.firstAttemptTime = BackOff.NEVER; this.lastAttemptTime = BackOff.NEVER; this.nextAttemptTime = BackOff.NEVER; @@ -86,6 +88,11 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { } @Override + public long getFirstAttemptTime() { + return firstAttemptTime; + } + + @Override public long getLastAttemptTime() { return lastAttemptTime; } @@ -100,6 +107,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { this.currentAttempts = 0; this.currentDelay = 0; this.currentElapsedTime = 0; + this.firstAttemptTime = 0; this.lastAttemptTime = BackOff.NEVER; this.nextAttemptTime = BackOff.NEVER; this.status = Status.Active; @@ -134,6 +142,9 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { if (status == Status.Active) { try { lastAttemptTime = System.currentTimeMillis(); + if (firstAttemptTime < 0) { + firstAttemptTime = lastAttemptTime; + } if (function.apply(this)) { long delay = next(); @@ -167,6 +178,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { this.currentAttempts = 0; this.currentDelay = BackOff.NEVER; this.currentElapsedTime = 0; + this.firstAttemptTime = BackOff.NEVER; this.lastAttemptTime = BackOff.NEVER; this.nextAttemptTime = BackOff.NEVER; this.status = Status.Inactive; @@ -220,6 +232,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { + ", currentAttempts=" + currentAttempts + ", currentDelay=" + currentDelay + ", currentElapsedTime=" + currentElapsedTime + + ", firstAttemptTime=" + firstAttemptTime + ", lastAttemptTime=" + lastAttemptTime + ", nextAttemptTime=" + nextAttemptTime + ']'; diff --git a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java b/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java index cf6a967..1cb911c 100644 --- a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java +++ b/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java @@ -22,11 +22,13 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.junit.Assert; import org.junit.Test; public class BackOffTimerTest { + @Test public void testBackOffTimer() throws Exception { final CountDownLatch latch = new CountDownLatch(1); @@ -34,14 +36,19 @@ public class BackOffTimerTest { final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); final BackOff backOff = BackOff.builder().delay(100).build(); final BackOffTimer timer = new BackOffTimer(executor); + final AtomicLong first = new AtomicLong(); BackOffTimer.Task task = timer.schedule( backOff, context -> { Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); Assert.assertEquals(100, context.getCurrentDelay()); - Assert.assertEquals(100, context.getCurrentDelay()); Assert.assertEquals(100 * counter.get(), context.getCurrentElapsedTime()); + if (first.get() == 0) { + first.set(context.getFirstAttemptTime()); + } else { + Assert.assertEquals(first.get(), context.getFirstAttemptTime()); + } return counter.get() < 5; } @@ -71,7 +78,6 @@ public class BackOffTimerTest { context -> { Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); Assert.assertEquals(100, context.getCurrentDelay()); - Assert.assertEquals(100, context.getCurrentDelay()); Assert.assertEquals(100 * counter.get(), context.getCurrentElapsedTime()); return true; @@ -103,7 +109,6 @@ public class BackOffTimerTest { context -> { Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); Assert.assertEquals(100, context.getCurrentDelay()); - Assert.assertEquals(100, context.getCurrentDelay()); Assert.assertEquals(100 * counter.get(), context.getCurrentElapsedTime()); return true;