This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new e5c5044 CAMEL-15036: Make it easier to use supervising route controller e5c5044 is described below commit e5c50448a1f59a37383224e5113abd110cf7e496 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon May 11 11:35:37 2020 +0200 CAMEL-15036: Make it easier to use supervising route controller --- .../java/org/apache/camel/spi/RouteController.java | 15 -- .../engine/DefaultSupervisingRouteController.java | 3 +- .../DefaultSupervisingRouteControllerTest.java | 182 +++++++++++++++++++++ .../MainConfigurationPropertiesConfigurer.java | 5 +- .../camel/main/DefaultConfigurationConfigurer.java | 4 +- .../camel/main/DefaultConfigurationProperties.java | 25 ++- ...gRouteControllerFilterFailToStartRouteTest.java | 2 +- .../main/MainSupervisingRouteControllerTest.java | 65 ++++++-- 8 files changed, 261 insertions(+), 40 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteController.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteController.java index 4ef114f..126620b 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/RouteController.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteController.java @@ -145,19 +145,4 @@ public interface RouteController extends CamelContextAware, StaticService { */ void resumeRoute(String routeId) throws Exception; - /** - * Access the underlying concrete RouteController implementation. - * - * @param clazz the proprietary class or interface of the underlying concrete RouteController. - * @return an instance of the underlying concrete RouteController as the required type. - */ - default <T extends RouteController> T unwrap(Class<T> clazz) { - if (RouteController.class.isAssignableFrom(clazz)) { - return clazz.cast(this); - } - - throw new IllegalArgumentException( - "Unable to unwrap this RouteController type (" + getClass() + ") to the required type (" + clazz + ")" - ); - } } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java index 948cd5f..b62219f 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java @@ -478,6 +478,7 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im try { logger.info("Restarting route: {} attempt: {}", r.getId(), attempt); doStartRoute(r, false, rx -> DefaultSupervisingRouteController.super.startRoute(rx.getId())); + logger.info("Route: {} started after {} attempts", r.getId(), attempt); return false; } catch (Exception e) { exceptions.put(r.getId(), e); @@ -520,7 +521,7 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im exceptions.remove(route.getId()); BackOffTimer.Task task = routes.remove(route); if (task != null) { - LOG.info("Cancel restart task for route {}", route.getId()); + LOG.info("Cancelling restart task for route: {}", route.getId()); task.cancel(); } diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java new file mode 100644 index 0000000..9cc9820 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.seda.SedaComponent; +import org.apache.camel.component.seda.SedaConsumer; +import org.apache.camel.component.seda.SedaEndpoint; +import org.apache.camel.spi.SupervisingRouteController; +import org.junit.Test; + +public class DefaultSupervisingRouteControllerTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testSupervising() throws Exception { + // lets make a simple route + context.addRoutes(new MyRoute()); + + // configure supervising + SupervisingRouteController src = context.adapt(ExtendedCamelContext.class).getSupervisingRouteController(); + src.setBackOffDelay(25); + src.setBackOffMaxAttempts(3); + src.setInitialDelay(100); + src.setThreadPoolSize(2); + context.setRouteController(src); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:foo", MockEndpoint.class); + mock.expectedMinimumMessageCount(3); + + MockEndpoint mock2 = context.getEndpoint("mock:cheese", MockEndpoint.class); + mock2.expectedMessageCount(0); + + MockEndpoint mock3 = context.getEndpoint("mock:cake", MockEndpoint.class); + mock3.expectedMessageCount(0); + + MockEndpoint mock4 = context.getEndpoint("mock:bar", MockEndpoint.class); + mock4.expectedMessageCount(0); + + MockEndpoint.assertIsSatisfied(5, TimeUnit.SECONDS, mock, mock2, mock3, mock4); + + assertEquals("Started", context.getRouteController().getRouteStatus("foo").toString()); + // cheese was not able to start + assertEquals("Stopped", context.getRouteController().getRouteStatus("cheese").toString()); + // cake was not able to start + assertEquals("Stopped", context.getRouteController().getRouteStatus("cake").toString()); + + Throwable e = src.getRestartException("cake"); + assertNotNull(e); + assertEquals("Cannot start", e.getMessage()); + assertTrue(e instanceof IllegalArgumentException); + + // bar is no auto startup + assertEquals("Stopped", context.getRouteController().getRouteStatus("bar").toString()); + } + + @Test + public void testSupervisingOk() throws Exception { + // lets make a simple route + context.addRoutes(new MyRoute()); + + // configure supervising + SupervisingRouteController src = context.adapt(ExtendedCamelContext.class).getSupervisingRouteController(); + src.setBackOffDelay(25); + src.setBackOffMaxAttempts(10); + src.setInitialDelay(100); + src.setThreadPoolSize(2); + context.setRouteController(src); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:foo", MockEndpoint.class); + mock.expectedMinimumMessageCount(3); + + MockEndpoint mock2 = context.getEndpoint("mock:cheese", MockEndpoint.class); + mock2.expectedMessageCount(0); + + MockEndpoint mock3 = context.getEndpoint("mock:cake", MockEndpoint.class); + mock3.expectedMessageCount(0); + + MockEndpoint mock4 = context.getEndpoint("mock:bar", MockEndpoint.class); + mock4.expectedMessageCount(0); + + MockEndpoint.assertIsSatisfied(5, TimeUnit.SECONDS, mock, mock2, mock3, mock4); + + // these should all start + assertEquals("Started", context.getRouteController().getRouteStatus("foo").toString()); + assertEquals("Started", context.getRouteController().getRouteStatus("cheese").toString()); + assertEquals("Started", context.getRouteController().getRouteStatus("cake").toString()); + // bar is no auto startup + assertEquals("Stopped", context.getRouteController().getRouteStatus("bar").toString()); + } + + private class MyRoute extends RouteBuilder { + @Override + public void configure() throws Exception { + getContext().addComponent("jms", new MyJmsComponent()); + + from("timer:foo").to("mock:foo").routeId("foo"); + + from("jms:cheese").to("mock:cheese").routeId("cheese"); + + from("jms:cake").to("mock:cake").routeId("cake"); + + from("seda:bar").routeId("bar").noAutoStartup().to("mock:bar"); + } + } + + private class MyJmsComponent extends SedaComponent { + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new MyJmsEndpoint(remaining); + } + } + + private class MyJmsEndpoint extends SedaEndpoint { + + private String name; + + public MyJmsEndpoint(String name) { + super(); + this.name = name; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new MyJmsConsumer(this, processor); + } + + @Override + protected String createEndpointUri() { + return "jms:" + name; + } + } + + private class MyJmsConsumer extends SedaConsumer { + + private int counter; + + public MyJmsConsumer(SedaEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected void doStart() throws Exception { + if (counter++ < 5) { + throw new IllegalArgumentException("Cannot start"); + } + } + } + +} diff --git a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java index 87f6333..fb8125f 100644 --- a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java +++ b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java @@ -7,7 +7,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.spi.GeneratedPropertyConfigurer; import org.apache.camel.spi.PropertyConfigurerGetter; import org.apache.camel.util.CaseInsensitiveMap; -import org.apache.camel.main.MainConfigurationProperties; /** * Generated by camel build tools - do NOT edit this file! @@ -116,7 +115,7 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "routecontrollerbackoffmultiplier": case "RouteControllerBackOffMultiplier": target.setRouteControllerBackOffMultiplier(property(camelContext, double.class, value)); return true; case "routecontrollerenabled": - case "RouteControllerEnabled": target.setRouteControllerEnabled(property(camelContext, boolean.class, value)); return true; + case "RouteControllerEnabled": target.setRouteControllerSuperviseEnabled(property(camelContext, boolean.class, value)); return true; case "routecontrollerexcluderoutes": case "RouteControllerExcludeRoutes": target.setRouteControllerExcludeRoutes(property(camelContext, java.lang.String.class, value)); return true; case "routecontrollerincluderoutes": @@ -373,7 +372,7 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "routecontrollerbackoffmultiplier": case "RouteControllerBackOffMultiplier": return target.getRouteControllerBackOffMultiplier(); case "routecontrollerenabled": - case "RouteControllerEnabled": return target.isRouteControllerEnabled(); + case "RouteControllerEnabled": return target.isRouteControllerSuperviseEnabled(); case "routecontrollerexcluderoutes": case "RouteControllerExcludeRoutes": return target.getRouteControllerExcludeRoutes(); case "routecontrollerincluderoutes": diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index 38fda13..9ef846f 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -178,8 +178,8 @@ public final class DefaultConfigurationConfigurer { camelContext.getExtension(Model.class).setRouteFilterPattern(config.getRouteFilterIncludePattern(), config.getRouteFilterExcludePattern()); } - // supervisting route controller - if (config.isRouteControllerEnabled()) { + // supervising route controller + if (config.isRouteControllerSuperviseEnabled()) { SupervisingRouteController src = camelContext.adapt(ExtendedCamelContext.class).getSupervisingRouteController(); src.setCamelContext(camelContext); if (config.getRouteControllerIncludeRoutes() != null) { diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java index 42e80aa..a8cd975 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java @@ -81,7 +81,7 @@ public abstract class DefaultConfigurationProperties<T> { private String xmlRoutes = "classpath:camel/*.xml"; private String xmlRests = "classpath:camel-rest/*.xml"; private boolean lightweight; - private boolean routeControllerEnabled; + private boolean routeControllerSuperviseEnabled; private String routeControllerIncludeRoutes; private String routeControllerExcludeRoutes; private int routeControllerThreadPoolSize; @@ -900,8 +900,8 @@ public abstract class DefaultConfigurationProperties<T> { this.lightweight = lightweight; } - public boolean isRouteControllerEnabled() { - return routeControllerEnabled; + public boolean isRouteControllerSuperviseEnabled() { + return routeControllerSuperviseEnabled; } /** @@ -914,8 +914,8 @@ public abstract class DefaultConfigurationProperties<T> { * using a background thread. The controller allows to be configured with various * settings to attempt to restart failing routes. */ - public void setRouteControllerEnabled(boolean routeControllerEnabled) { - this.routeControllerEnabled = routeControllerEnabled; + public void setRouteControllerSuperviseEnabled(boolean routeControllerSuperviseEnabled) { + this.routeControllerSuperviseEnabled = routeControllerSuperviseEnabled; } public String getRouteControllerIncludeRoutes() { @@ -1699,8 +1699,8 @@ public abstract class DefaultConfigurationProperties<T> { * using a background thread. The controller allows to be configured with various * settings to attempt to restart failing routes. */ - public T withRouteControllerEnabled(boolean routeControllerEnabled) { - this.routeControllerEnabled = routeControllerEnabled; + public T withRouteControllerSuperviseEnabled(boolean routeControllerSuperviseEnabled) { + this.routeControllerSuperviseEnabled = routeControllerSuperviseEnabled; return (T) this; } @@ -1757,4 +1757,15 @@ public abstract class DefaultConfigurationProperties<T> { return (T) this; } + /** + * The number of threads used by the route controller scheduled thread pool that are used for restarting + * routes. The pool uses 1 thread by default, but you can increase this to allow the controller + * to concurrently attempt to restart multiple routes in case more than one route has problems + * starting. + */ + public T withRouteControllerThreadPoolSize(int routeControllerThreadPoolSize) { + this.routeControllerThreadPoolSize = routeControllerThreadPoolSize; + return (T) this; + } + } diff --git a/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerFilterFailToStartRouteTest.java b/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerFilterFailToStartRouteTest.java index c1fae8f..6f064e1 100644 --- a/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerFilterFailToStartRouteTest.java +++ b/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerFilterFailToStartRouteTest.java @@ -36,7 +36,7 @@ public class MainSupervisingRouteControllerFilterFailToStartRouteTest extends As // lets make a simple route Main main = new Main(); main.configure().addRoutesBuilder(new MyRoute()); - main.configure().setRouteControllerEnabled(true); + main.configure().setRouteControllerSuperviseEnabled(true); main.configure().setRouteControllerBackOffDelay(250); main.configure().setRouteControllerBackOffMaxAttempts(3); main.configure().setRouteControllerInitialDelay(1000); diff --git a/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerTest.java b/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerTest.java index ff88fbd..76124b6 100644 --- a/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerTest.java +++ b/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerTest.java @@ -38,12 +38,11 @@ public class MainSupervisingRouteControllerTest extends Assert { // lets make a simple route Main main = new Main(); main.configure().addRoutesBuilder(new MyRoute()); - main.configure().setRouteControllerEnabled(true); - main.configure().setRouteControllerBackOffDelay(250); - main.configure().setRouteControllerBackOffMaxAttempts(3); - main.configure().setRouteControllerInitialDelay(1000); - main.configure().setRouteControllerThreadPoolSize(2); - + main.configure().withRouteControllerSuperviseEnabled(true) + .withRouteControllerBackOffDelay(25) + .withRouteControllerBackOffMaxAttempts(3) + .withRouteControllerInitialDelay(100) + .withRouteControllerThreadPoolSize(2); main.start(); MockEndpoint mock = main.getCamelContext().getEndpoint("mock:foo", MockEndpoint.class); @@ -58,7 +57,7 @@ public class MainSupervisingRouteControllerTest extends Assert { MockEndpoint mock4 = main.getCamelContext().getEndpoint("mock:bar", MockEndpoint.class); mock4.expectedMessageCount(0); - MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock, mock2, mock3, mock4); + MockEndpoint.assertIsSatisfied(5, TimeUnit.SECONDS, mock, mock2, mock3, mock4); assertEquals("Started", main.camelContext.getRouteController().getRouteStatus("foo").toString()); // cheese was not able to start @@ -78,6 +77,43 @@ public class MainSupervisingRouteControllerTest extends Assert { main.stop(); } + @Test + public void testMainOk() throws Exception { + // lets make a simple route + Main main = new Main(); + main.configure().addRoutesBuilder(new MyRoute()); + main.configure().setRouteControllerSuperviseEnabled(true); + main.configure().setRouteControllerBackOffDelay(25); + main.configure().setRouteControllerBackOffMaxAttempts(10); + main.configure().setRouteControllerInitialDelay(100); + main.configure().setRouteControllerThreadPoolSize(2); + + main.start(); + + MockEndpoint mock = main.getCamelContext().getEndpoint("mock:foo", MockEndpoint.class); + mock.expectedMinimumMessageCount(3); + + MockEndpoint mock2 = main.getCamelContext().getEndpoint("mock:cheese", MockEndpoint.class); + mock2.expectedMessageCount(0); + + MockEndpoint mock3 = main.getCamelContext().getEndpoint("mock:cake", MockEndpoint.class); + mock3.expectedMessageCount(0); + + MockEndpoint mock4 = main.getCamelContext().getEndpoint("mock:bar", MockEndpoint.class); + mock4.expectedMessageCount(0); + + MockEndpoint.assertIsSatisfied(5, TimeUnit.SECONDS, mock, mock2, mock3, mock4); + + // these should all start + assertEquals("Started", main.camelContext.getRouteController().getRouteStatus("foo").toString()); + assertEquals("Started", main.camelContext.getRouteController().getRouteStatus("cheese").toString()); + assertEquals("Started", main.camelContext.getRouteController().getRouteStatus("cake").toString()); + // bar is no auto startup + assertEquals("Stopped", main.camelContext.getRouteController().getRouteStatus("bar").toString()); + + main.stop(); + } + private class MyRoute extends RouteBuilder { @Override public void configure() throws Exception { @@ -97,14 +133,17 @@ public class MainSupervisingRouteControllerTest extends Assert { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - return new MyJmsEndpoint(); + return new MyJmsEndpoint(remaining); } } private class MyJmsEndpoint extends SedaEndpoint { - public MyJmsEndpoint() { + private String name; + + public MyJmsEndpoint(String name) { super(); + this.name = name; } @Override @@ -114,19 +153,23 @@ public class MainSupervisingRouteControllerTest extends Assert { @Override protected String createEndpointUri() { - return "jms:cheese"; + return "jms:" + name; } } private class MyJmsConsumer extends SedaConsumer { + private int counter; + public MyJmsConsumer(SedaEndpoint endpoint, Processor processor) { super(endpoint, processor); } @Override protected void doStart() throws Exception { - throw new IllegalArgumentException("Cannot start"); + if (counter++ < 5) { + throw new IllegalArgumentException("Cannot start"); + } } }