This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new a2f6647  CAMEL-15035: SupervisingRouteController - JMX management
a2f6647 is described below

commit a2f664737775b8e627632a5e3456eb89cd59eafc
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun May 10 17:51:03 2020 +0200

    CAMEL-15035: SupervisingRouteController - JMX management
---
 .../camel/spi/SupervisingRouteController.java      |  12 ++
 .../engine/DefaultSupervisingRouteController.java  |   7 ++
 .../ManagedSupervisingRouteControllerMBean.java    |  59 +++++++++
 .../management/JmxManagementLifecycleStrategy.java |   4 +
 .../mbean/ManagedSupervisingRouteController.java   |  93 ++++++++++++++
 .../ManagedSupervisingRouteControllerTest.java     | 137 +++++++++++++++++++++
 6 files changed, 312 insertions(+)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java
index 0e16f68..e0b6228 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.spi;
 
+import java.util.Collection;
+
+import org.apache.camel.Route;
+
 /**
  * A supervising capable {@link RouteController} that delays the startup
  * of the routes after the camel context startup and takes control of starting 
the routes in a safe manner.
@@ -111,4 +115,12 @@ public interface SupervisingRouteController extends 
RouteController {
      */
     void setBackOffMultiplier(double backOffMultiplier);
 
+    /**
+     * Return the list of routes that are currently under restarting by this 
controller.
+     *
+     * In other words the routes which has failed during startup and are know 
managed
+     * to be restarted.
+     */
+    Collection<Route> getRestartingRoutes();
+
 }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
index 3725145..9b7ce52 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
@@ -318,6 +318,13 @@ public class DefaultSupervisingRouteController extends 
DefaultRouteController im
             .collect(Collectors.toList());
     }
 
+    @Override
+    public Collection<Route> getRestartingRoutes() {
+        return routeManager.routes.keySet().stream()
+                .map(RouteHolder::get)
+                .collect(Collectors.toList());
+    }
+
     // *********************************
     // Helpers
     // *********************************
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java
new file mode 100644
index 0000000..a5ca7be
--- /dev/null
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSupervisingRouteControllerMBean.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedSupervisingRouteControllerMBean extends 
ManagedServiceMBean {
+
+    @ManagedAttribute(description = "Whether supervising is enabled")
+    boolean isEnabled();
+
+    @ManagedAttribute(description = "The number of threads used by the 
scheduled thread pool that are used for restarting routes")
+    int getThreadPoolSize();
+
+    @ManagedAttribute(description = "Initial delay in milli seconds before the 
route controller starts")
+    long getInitialDelay();
+
+    @ManagedAttribute(description = "Backoff delay in millis when restarting a 
route that failed to startup")
+    long getBackOffDelay();
+
+    @ManagedAttribute(description = "Backoff maximum delay in millis when 
restarting a route that failed to startup")
+    long getBackOffMaxDelay();
+
+    @ManagedAttribute(description = "Backoff maximum elapsed time in millis, 
after which the backoff should be considered exhausted and no more attempts 
should be made")
+    long getBackOffMaxElapsedTime();
+
+    @ManagedAttribute(description = "Backoff maximum number of attempts to 
restart a route that failed to startup")
+    long getBackOffMaxAttempts();
+
+    @ManagedAttribute(description = "Backoff multiplier to use for exponential 
backoff")
+    double getBackOffMultiplier();
+
+    @ManagedAttribute(description = "Pattern for filtering routes to be 
included as supervised")
+    String getIncludeRoutes();
+
+    @ManagedAttribute(description = "Pattern for filtering routes to be 
excluded as supervised")
+    String getExcludeRoutes();
+
+    @ManagedAttribute(description = "Number of routes controlled by the 
controller")
+    int getNumberOfControlledRoutes();
+
+    @ManagedAttribute(description = "Number of routes which have failed to 
startup and are currently managed to be restarted")
+    int getNumberOfRestartingRoutes();
+
+}
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
index 48a9d0b..bb9d359 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
@@ -65,6 +65,7 @@ import org.apache.camel.management.mbean.ManagedRoute;
 import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry;
 import org.apache.camel.management.mbean.ManagedService;
 import org.apache.camel.management.mbean.ManagedStreamCachingStrategy;
+import org.apache.camel.management.mbean.ManagedSupervisingRouteController;
 import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy;
 import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
 import org.apache.camel.management.mbean.ManagedTracer;
@@ -98,6 +99,7 @@ import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RestRegistry;
 import org.apache.camel.spi.RuntimeEndpointRegistry;
 import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.spi.SupervisingRouteController;
 import org.apache.camel.spi.Tracer;
 import org.apache.camel.spi.TransformerRegistry;
 import org.apache.camel.spi.TypeConverterRegistry;
@@ -549,6 +551,8 @@ public class JmxManagementLifecycleStrategy extends 
ServiceSupport implements Li
             answer = new ManagedValidatorRegistry(context, 
(ValidatorRegistry)service);
         } else if (service instanceof CamelClusterService) {
             answer = 
getManagementObjectStrategy().getManagedObjectForClusterService(context, 
(CamelClusterService)service);
+        } else if (service instanceof SupervisingRouteController) {
+            answer = new ManagedSupervisingRouteController(context, 
(SupervisingRouteController) service);
         } else if (service != null) {
             // fallback as generic service
             answer = 
getManagementObjectStrategy().getManagedObjectForService(context, service);
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java
new file mode 100644
index 0000000..96dc616
--- /dev/null
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedSupervisingRouteController.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import 
org.apache.camel.api.management.mbean.ManagedSupervisingRouteControllerMBean;
+import org.apache.camel.spi.SupervisingRouteController;
+
+@ManagedResource(description = "Managed SupervisingRouteController")
+public class ManagedSupervisingRouteController extends ManagedService 
implements ManagedSupervisingRouteControllerMBean {
+
+    private final SupervisingRouteController controller;
+
+    public ManagedSupervisingRouteController(CamelContext context, 
SupervisingRouteController controller) {
+        super(context, controller);
+        this.controller = controller;
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return true;
+    }
+
+    @Override
+    public int getThreadPoolSize() {
+        return controller.getThreadPoolSize();
+    }
+
+    @Override
+    public long getInitialDelay() {
+        return controller.getInitialDelay();
+    }
+
+    @Override
+    public long getBackOffDelay() {
+        return controller.getBackOffDelay();
+    }
+
+    @Override
+    public long getBackOffMaxDelay() {
+        return controller.getBackOffMaxDelay();
+    }
+
+    @Override
+    public long getBackOffMaxElapsedTime() {
+        return controller.getBackOffMaxElapsedTime();
+    }
+
+    @Override
+    public long getBackOffMaxAttempts() {
+        return controller.getBackOffMaxAttempts();
+    }
+
+    @Override
+    public double getBackOffMultiplier() {
+        return controller.getBackOffMultiplier();
+    }
+
+    @Override
+    public String getIncludeRoutes() {
+        return controller.getIncludeRoutes();
+    }
+
+    @Override
+    public String getExcludeRoutes() {
+        return controller.getExcludeRoutes();
+    }
+
+    @Override
+    public int getNumberOfControlledRoutes() {
+        return controller.getControlledRoutes().size();
+    }
+
+    @Override
+    public int getNumberOfRestartingRoutes() {
+        return controller.getRestartingRoutes().size();
+    }
+}
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java
new file mode 100644
index 0000000..4b51a07
--- /dev/null
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedSupervisingRouteControllerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.seda.SedaComponent;
+import org.apache.camel.component.seda.SedaConsumer;
+import org.apache.camel.component.seda.SedaEndpoint;
+import org.apache.camel.impl.engine.DefaultSupervisingRouteController;
+import org.apache.camel.spi.SupervisingRouteController;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class ManagedSupervisingRouteControllerTest extends 
ManagementTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        SupervisingRouteController src = new 
DefaultSupervisingRouteController();
+        src.setThreadPoolSize(2);
+        src.setBackOffDelay(250);
+        src.setBackOffMaxAttempts(3);
+        context.setRouteController(src);
+        return context;
+    }
+
+    @Test
+    public void testSupervisingRouteController() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        // get the object name for the delayer
+        ObjectName on = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultSupervisingRouteController");
+        assertTrue(mbeanServer.isRegistered(on));
+
+        Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertTrue(enabled);
+
+        Integer threadPoolSize = (Integer) mbeanServer.getAttribute(on, 
"ThreadPoolSize");
+        assertEquals(2, threadPoolSize.intValue());
+
+        Long backOffDelay = (Long) mbeanServer.getAttribute(on, 
"BackOffDelay");
+        assertEquals(250, backOffDelay.intValue());
+
+        Integer routes = (Integer) mbeanServer.getAttribute(on, 
"NumberOfControlledRoutes");
+        assertEquals(3, routes.intValue());
+
+        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            Integer restarting = (Integer) mbeanServer.getAttribute(on, 
"NumberOfRestartingRoutes");
+            assertEquals(2, restarting.intValue());
+        });
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                getContext().addComponent("jms", new MyJmsComponent());
+
+                from("timer:foo").to("mock:foo").routeId("foo");
+
+                from("jms:cheese").to("mock:cheese").routeId("cheese");
+
+                from("jms:cake").to("mock:cake").routeId("cake");
+
+                from("seda:bar").routeId("bar").noAutoStartup().to("mock:bar");
+            }
+        };
+    }
+
+    private class MyJmsComponent extends SedaComponent {
+
+        @Override
+        protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+            return new MyJmsEndpoint();
+        }
+    }
+
+    private class MyJmsEndpoint extends SedaEndpoint {
+
+        public MyJmsEndpoint() {
+            super();
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) throws Exception {
+            return new MyJmsConsumer(this, processor);
+        }
+
+        @Override
+        protected String createEndpointUri() {
+            return "jms:cheese";
+        }
+    }
+
+    private class MyJmsConsumer extends SedaConsumer {
+
+        public MyJmsConsumer(SedaEndpoint endpoint, Processor processor) {
+            super(endpoint, processor);
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            throw new IllegalArgumentException("Cannot start");
+        }
+    }
+}

Reply via email to