This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new a2f6647 CAMEL-15035: SupervisingRouteController - JMX management a2f6647 is described below commit a2f664737775b8e627632a5e3456eb89cd59eafc Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun May 10 17:51:03 2020 +0200 CAMEL-15035: SupervisingRouteController - JMX management --- .../camel/spi/SupervisingRouteController.java | 12 ++ .../engine/DefaultSupervisingRouteController.java | 7 ++ .../ManagedSupervisingRouteControllerMBean.java | 59 +++++++++ .../management/JmxManagementLifecycleStrategy.java | 4 + .../mbean/ManagedSupervisingRouteController.java | 93 ++++++++++++++ .../ManagedSupervisingRouteControllerTest.java | 137 +++++++++++++++++++++ 6 files changed, 312 insertions(+) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java b/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java index 0e16f68..e0b6228 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java @@ -16,6 +16,10 @@ */ package org.apache.camel.spi; +import java.util.Collection; + +import org.apache.camel.Route; + /** * A supervising capable {@link RouteController} that delays the startup * of the routes after the camel context startup and takes control of starting the routes in a safe manner. @@ -111,4 +115,12 @@ public interface SupervisingRouteController extends RouteController { */ void setBackOffMultiplier(double backOffMultiplier); + /** + * Return the list of routes that are currently under restarting by this controller. + * + * In other words the routes which has failed during startup and are know managed + * to be restarted. + */ + Collection<Route> getRestartingRoutes(); + } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java index 3725145..9b7ce52 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java @@ -318,6 +318,13 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im .collect(Collectors.toList()); } + @Override + public Collection<Route> getRestartingRoutes() { + return routeManager.routes.keySet().stream() + .map(RouteHolder::get) + .collect(Collectors.toList()); + } + // ********************************* // Helpers // ********************************* diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java new file mode 100644 index 0000000..a5ca7be --- /dev/null +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; + +public interface ManagedSupervisingRouteControllerMBean extends ManagedServiceMBean { + + @ManagedAttribute(description = "Whether supervising is enabled") + boolean isEnabled(); + + @ManagedAttribute(description = "The number of threads used by the scheduled thread pool that are used for restarting routes") + int getThreadPoolSize(); + + @ManagedAttribute(description = "Initial delay in milli seconds before the route controller starts") + long getInitialDelay(); + + @ManagedAttribute(description = "Backoff delay in millis when restarting a route that failed to startup") + long getBackOffDelay(); + + @ManagedAttribute(description = "Backoff maximum delay in millis when restarting a route that failed to startup") + long getBackOffMaxDelay(); + + @ManagedAttribute(description = "Backoff maximum elapsed time in millis, after which the backoff should be considered exhausted and no more attempts should be made") + long getBackOffMaxElapsedTime(); + + @ManagedAttribute(description = "Backoff maximum number of attempts to restart a route that failed to startup") + long getBackOffMaxAttempts(); + + @ManagedAttribute(description = "Backoff multiplier to use for exponential backoff") + double getBackOffMultiplier(); + + @ManagedAttribute(description = "Pattern for filtering routes to be included as supervised") + String getIncludeRoutes(); + + @ManagedAttribute(description = "Pattern for filtering routes to be excluded as supervised") + String getExcludeRoutes(); + + @ManagedAttribute(description = "Number of routes controlled by the controller") + int getNumberOfControlledRoutes(); + + @ManagedAttribute(description = "Number of routes which have failed to startup and are currently managed to be restarted") + int getNumberOfRestartingRoutes(); + +} diff --git a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java index 48a9d0b..bb9d359 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java @@ -65,6 +65,7 @@ import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry; import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; +import org.apache.camel.management.mbean.ManagedSupervisingRouteController; import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy; import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; import org.apache.camel.management.mbean.ManagedTracer; @@ -98,6 +99,7 @@ import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RestRegistry; import org.apache.camel.spi.RuntimeEndpointRegistry; import org.apache.camel.spi.StreamCachingStrategy; +import org.apache.camel.spi.SupervisingRouteController; import org.apache.camel.spi.Tracer; import org.apache.camel.spi.TransformerRegistry; import org.apache.camel.spi.TypeConverterRegistry; @@ -549,6 +551,8 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li answer = new ManagedValidatorRegistry(context, (ValidatorRegistry)service); } else if (service instanceof CamelClusterService) { answer = getManagementObjectStrategy().getManagedObjectForClusterService(context, (CamelClusterService)service); + } else if (service instanceof SupervisingRouteController) { + answer = new ManagedSupervisingRouteController(context, (SupervisingRouteController) service); } else if (service != null) { // fallback as generic service answer = getManagementObjectStrategy().getManagedObjectForService(context, service); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java new file mode 100644 index 0000000..96dc616 --- /dev/null +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedSupervisingRouteControllerMBean; +import org.apache.camel.spi.SupervisingRouteController; + +@ManagedResource(description = "Managed SupervisingRouteController") +public class ManagedSupervisingRouteController extends ManagedService implements ManagedSupervisingRouteControllerMBean { + + private final SupervisingRouteController controller; + + public ManagedSupervisingRouteController(CamelContext context, SupervisingRouteController controller) { + super(context, controller); + this.controller = controller; + } + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public int getThreadPoolSize() { + return controller.getThreadPoolSize(); + } + + @Override + public long getInitialDelay() { + return controller.getInitialDelay(); + } + + @Override + public long getBackOffDelay() { + return controller.getBackOffDelay(); + } + + @Override + public long getBackOffMaxDelay() { + return controller.getBackOffMaxDelay(); + } + + @Override + public long getBackOffMaxElapsedTime() { + return controller.getBackOffMaxElapsedTime(); + } + + @Override + public long getBackOffMaxAttempts() { + return controller.getBackOffMaxAttempts(); + } + + @Override + public double getBackOffMultiplier() { + return controller.getBackOffMultiplier(); + } + + @Override + public String getIncludeRoutes() { + return controller.getIncludeRoutes(); + } + + @Override + public String getExcludeRoutes() { + return controller.getExcludeRoutes(); + } + + @Override + public int getNumberOfControlledRoutes() { + return controller.getControlledRoutes().size(); + } + + @Override + public int getNumberOfRestartingRoutes() { + return controller.getRestartingRoutes().size(); + } +} diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java new file mode 100644 index 0000000..4b51a07 --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.seda.SedaComponent; +import org.apache.camel.component.seda.SedaConsumer; +import org.apache.camel.component.seda.SedaEndpoint; +import org.apache.camel.impl.engine.DefaultSupervisingRouteController; +import org.apache.camel.spi.SupervisingRouteController; +import org.junit.Test; + +import static org.awaitility.Awaitility.await; + +public class ManagedSupervisingRouteControllerTest extends ManagementTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + SupervisingRouteController src = new DefaultSupervisingRouteController(); + src.setThreadPoolSize(2); + src.setBackOffDelay(250); + src.setBackOffMaxAttempts(3); + context.setRouteController(src); + return context; + } + + @Test + public void testSupervisingRouteController() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // get the object name for the delayer + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultSupervisingRouteController"); + assertTrue(mbeanServer.isRegistered(on)); + + Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertTrue(enabled); + + Integer threadPoolSize = (Integer) mbeanServer.getAttribute(on, "ThreadPoolSize"); + assertEquals(2, threadPoolSize.intValue()); + + Long backOffDelay = (Long) mbeanServer.getAttribute(on, "BackOffDelay"); + assertEquals(250, backOffDelay.intValue()); + + Integer routes = (Integer) mbeanServer.getAttribute(on, "NumberOfControlledRoutes"); + assertEquals(3, routes.intValue()); + + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + Integer restarting = (Integer) mbeanServer.getAttribute(on, "NumberOfRestartingRoutes"); + assertEquals(2, restarting.intValue()); + }); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + getContext().addComponent("jms", new MyJmsComponent()); + + from("timer:foo").to("mock:foo").routeId("foo"); + + from("jms:cheese").to("mock:cheese").routeId("cheese"); + + from("jms:cake").to("mock:cake").routeId("cake"); + + from("seda:bar").routeId("bar").noAutoStartup().to("mock:bar"); + } + }; + } + + private class MyJmsComponent extends SedaComponent { + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new MyJmsEndpoint(); + } + } + + private class MyJmsEndpoint extends SedaEndpoint { + + public MyJmsEndpoint() { + super(); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new MyJmsConsumer(this, processor); + } + + @Override + protected String createEndpointUri() { + return "jms:cheese"; + } + } + + private class MyJmsConsumer extends SedaConsumer { + + public MyJmsConsumer(SedaEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected void doStart() throws Exception { + throw new IllegalArgumentException("Cannot start"); + } + } +}