Repository: camel Updated Branches: refs/heads/master 57299969c -> a613a589e
CAMEL-11579: Add unit test / example for SupervisingRouteController Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a613a589 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a613a589 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a613a589 Branch: refs/heads/master Commit: a613a589e9f15062f973dd426aca8d490b308398 Parents: 5729996 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Tue Aug 8 14:56:21 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed Aug 9 12:04:47 2017 +0200 ---------------------------------------------------------------------- .../org/apache/camel/spi/RouteController.java | 16 ++ .../camel/util/backoff/BackOffContext.java | 7 +- .../apache/camel/util/backoff/BackOffTimer.java | 13 +- .../SupervisingRouteControllerRestartTest.java | 146 +++++++++++++++++++ .../boot/SupervisingRouteControllerTest.java | 126 ++++++++++++++++ 5 files changed, 300 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a613a589/camel-core/src/main/java/org/apache/camel/spi/RouteController.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/RouteController.java b/camel-core/src/main/java/org/apache/camel/spi/RouteController.java index d19a8a2..b772a7c 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/RouteController.java +++ b/camel-core/src/main/java/org/apache/camel/spi/RouteController.java @@ -46,4 +46,20 @@ public interface RouteController extends CamelContextAware, Service { void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception; void resumeRoute(String routeId) throws Exception; + + /** + * Access the underlying concrete RouteController implementation. + * + * @param clazz the proprietary class or interface of the underlying concrete RouteController. + * @return an instance of the underlying concrete RouteController as the required type. + */ + default <T extends RouteController> T unwrap(Class<T> clazz) { + if (RouteController.class.isAssignableFrom(clazz)) { + return clazz.cast(this); + } + + throw new IllegalArgumentException( + "Unable to unwrap this RouteController type (" + getClass() + ") to the required type (" + clazz + ")" + ); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/a613a589/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java index cd46653..617f0a8 100644 --- a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java +++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java @@ -28,6 +28,7 @@ public final class BackOffContext { public BackOffContext(BackOff backOff) { this.backOff = backOff; + this.currentAttempts = 0; this.currentDelay = backOff.getDelay().toMillis(); this.currentElapsedTime = 0; @@ -40,7 +41,7 @@ public final class BackOffContext { /** * The back-off associated with this context. */ - public BackOff backOff() { + public BackOff getBackOff() { return backOff; } @@ -81,7 +82,7 @@ public final class BackOffContext { * or ${@link BackOff#NEVER} to indicate that no further attempt should be * made. */ - public long next() { + long next() { // A call to next when currentDelay is set to NEVER has no effects // as this means that either the timer is exhausted or it has explicit // stopped @@ -95,7 +96,7 @@ public final class BackOffContext { currentDelay = BackOff.NEVER; } else { if (currentDelay <= backOff.getMaxDelay().toMillis()) { - currentDelay = (long) (currentDelay * backOff().getMultiplier()); + currentDelay = (long) (currentDelay * backOff.getMultiplier()); } currentElapsedTime += currentDelay; http://git-wip-us.apache.org/repos/asf/camel/blob/a613a589/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java index c0c035f..dbae257 100644 --- a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java +++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java @@ -38,10 +38,9 @@ public class BackOffTimer { * according to the given backOff. */ public CompletableFuture<BackOffContext> schedule(BackOff backOff, ThrowingFunction<BackOffContext, Boolean, Exception> function) { - final BackOffContext context = new BackOffContext(backOff); - final Task task = new Task(context, function); + final Task task = new Task(backOff, function); - long delay = context.next(); + long delay = task.getContext().next(); if (delay != BackOff.NEVER) { scheduler.schedule(task, delay, TimeUnit.MILLISECONDS); } else { @@ -59,8 +58,8 @@ public class BackOffTimer { private final BackOffContext context; private final ThrowingFunction<BackOffContext, Boolean, Exception> function; - Task(BackOffContext context, ThrowingFunction<BackOffContext, Boolean, Exception> function) { - this.context = context; + Task(BackOff backOff, ThrowingFunction<BackOffContext, Boolean, Exception> function) { + this.context = new BackOffContext(backOff); this.function = function; } @@ -100,5 +99,9 @@ public class BackOffTimer { boolean complete() { return super.complete(context); } + + BackOffContext getContext() { + return context; + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/a613a589/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java new file mode 100644 index 0000000..74c8f09 --- /dev/null +++ b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.boot; + +import java.net.BindException; +import java.net.ServerSocket; + +import org.apache.camel.CamelContext; +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.SupervisingRouteController; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +@DirtiesContext +@RunWith(SpringRunner.class) +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SupervisingRouteControllerAutoConfiguration.class, + SupervisingRouteControllerRestartTest.TestConfiguration.class + }, + properties = { + "camel.springboot.xml-routes = false", + "camel.springboot.main-run-controller = true", + "camel.supervising.controller.enabled = true", + "camel.supervising.controller.initial-delay = 2s", + "camel.supervising.controller.default-back-off.delay = 1s", + "camel.supervising.controller.default-back-off.max-attempts = 10", + "camel.supervising.controller.routes.bar.back-off.delay = 10s", + "camel.supervising.controller.routes.bar.back-off.max-attempts = 3", + "camel.supervising.controller.routes.timer-unmanaged.supervise = false" + } +) +public class SupervisingRouteControllerRestartTest { + @Autowired + private CamelContext context; + + @Test + public void test() throws Exception { + Assert.assertNotNull(context.getRouteController()); + Assert.assertTrue(context.getRouteController() instanceof SupervisingRouteController); + + SupervisingRouteController controller = context.getRouteController().unwrap(SupervisingRouteController.class); + + // Wait for the controller to start the routes + Thread.sleep(2500); + + Assert.assertEquals(ServiceStatus.Started, context.getRouteStatus("foo")); + Assert.assertEquals(ServiceStatus.Started, context.getRouteStatus("bar")); + Assert.assertEquals(ServiceStatus.Started, context.getRouteStatus("jetty")); + + // Wait a little + Thread.sleep(250); + + controller.stopRoute("jetty"); + + Assert.assertNull(context.getRoute("jetty").getRouteContext().getRouteController()); + + // bind the port so starting the route jetty should fail + ServerSocket socket = new ServerSocket(TestConfiguration.PORT); + + try { + controller.startRoute("jetty"); + } catch (Exception e) { + assertThat(e).isInstanceOf(BindException.class); + } + + // Wait for at lest one restart attempt. + Thread.sleep(2000); + + try { + socket.close(); + } catch (Exception e) { + fail("Failed to close server socket", e); + } + + // Wait for wile to give time to the controller to start the route + Thread.sleep(1500); + + Assert.assertEquals(ServiceStatus.Started, context.getRouteStatus("jetty")); + Assert.assertNotNull(context.getRoute("jetty").getRouteContext().getRouteController()); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + private static final int PORT = AvailablePortFinder.getNextAvailable(); + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:foo?period=5s") + .id("foo") + .startupOrder(2) + .to("mock:foo"); + from("timer:bar?period=5s") + .id("bar") + .startupOrder(1) + .to("mock:bar"); + from("timer:unmanaged?period=5s") + .id("timer-unmanaged") + .to("mock:timer-unmanaged"); + from("timer:no-autostartup?period=5s") + .id("timer-no-autostartup") + .autoStartup(false) + .to("mock:timer-no-autostartup"); + + fromF("jetty:http://localhost:%d", PORT) + .id("jetty") + .to("mock:jetty"); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a613a589/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerTest.java b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerTest.java new file mode 100644 index 0000000..b1a43f0 --- /dev/null +++ b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.boot; + +import org.apache.camel.CamelContext; +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.SupervisingRouteController; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.util.backoff.BackOff; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +@DirtiesContext +@RunWith(SpringRunner.class) +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SupervisingRouteControllerAutoConfiguration.class, + SupervisingRouteControllerTest.TestConfiguration.class + }, + properties = { + "camel.springboot.xml-routes = false", + "camel.springboot.main-run-controller = true", + "camel.supervising.controller.enabled = true", + "camel.supervising.controller.initial-delay = 2s", + "camel.supervising.controller.default-back-off.delay = 1s", + "camel.supervising.controller.default-back-off.max-attempts = 10", + "camel.supervising.controller.routes.bar.back-off.delay = 10s", + "camel.supervising.controller.routes.bar.back-off.max-attempts = 3", + "camel.supervising.controller.routes.timer-unmanaged.supervise = false" + } +) +public class SupervisingRouteControllerTest { + @Autowired + private CamelContext context; + + @Test + public void test() throws Exception { + Assert.assertNotNull(context.getRouteController()); + Assert.assertTrue(context.getRouteController() instanceof SupervisingRouteController); + + SupervisingRouteController controller = context.getRouteController().unwrap(SupervisingRouteController.class); + + Assert.assertEquals(3, controller.getControlledRoutes().size()); + Assert.assertEquals(2, controller.getInitialDelay().getSeconds()); + + // Route foo + BackOff foo = controller.getBackOff("foo"); + Assert.assertEquals(1, foo.getDelay().getSeconds()); + Assert.assertEquals(Long.MAX_VALUE, foo.getMaxDelay().toMillis()); + Assert.assertEquals(10L, foo.getMaxAttempts().longValue()); + + // Route bar + BackOff bar = controller.getBackOff("bar"); + Assert.assertEquals(10, bar.getDelay().getSeconds()); + Assert.assertEquals(Long.MAX_VALUE, bar.getMaxDelay().toMillis()); + Assert.assertEquals(3L, bar.getMaxAttempts().longValue()); + + Assert.assertEquals(controller, context.getRoute("foo").getRouteContext().getRouteController()); + Assert.assertEquals(controller, context.getRoute("bar").getRouteContext().getRouteController()); + Assert.assertNull(context.getRoute("timer-unmanaged").getRouteContext().getRouteController()); + Assert.assertNull(context.getRoute("timer-no-autostartup").getRouteContext().getRouteController()); + + Assert.assertEquals(ServiceStatus.Stopped, context.getRouteStatus("foo")); + Assert.assertEquals(ServiceStatus.Stopped, context.getRouteStatus("bar")); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + private static final int PORT = AvailablePortFinder.getNextAvailable(); + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:foo?period=5s") + .id("foo") + .startupOrder(2) + .to("mock:foo"); + from("timer:bar?period=5s") + .id("bar") + .startupOrder(1) + .to("mock:bar"); + from("timer:unmanaged?period=5s") + .id("timer-unmanaged") + .to("mock:timer-unmanaged"); + from("timer:no-autostartup?period=5s") + .id("timer-no-autostartup") + .autoStartup(false) + .to("mock:timer-no-autostartup"); + + fromF("jetty:http://localhost:%d", PORT) + .id("jetty") + .to("mock:jetty"); + } + }; + } + } +}