This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch res-event in repository https://gitbox.apache.org/repos/asf/camel.git
commit a9d2ce71bf1cb7a32b31c89b994781b4d801f18a Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Oct 6 11:11:51 2024 +0200 CAMEL-21322: camel-core: Add RouteRestartingEvent/RouteRestartingFailureEvent to keep track of route during supervising startup of routes. --- ...mppTRXProducerSupervisingRouteControllerIT.java | 20 ++++++ .../main/java/org/apache/camel/spi/CamelEvent.java | 16 +++++ .../java/org/apache/camel/spi/EventFactory.java | 21 ++++++ .../engine/DefaultSupervisingRouteController.java | 11 ++- .../camel/impl/event/DefaultEventFactory.java | 18 +++++ .../camel/impl/event/RouteRestartingEvent.java | 41 +++++++++++ .../impl/event/RouteRestartingFailureEvent.java | 55 +++++++++++++++ .../DefaultSupervisingRouteControllerTest.java | 19 ++++++ .../java/org/apache/camel/support/EventHelper.java | 79 ++++++++++++++++++++++ 9 files changed, 278 insertions(+), 2 deletions(-) diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java index 08f78639b7b..74092f2f11a 100644 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java +++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.smpp.integration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -29,7 +31,9 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.smpp.SmppConstants; import org.apache.camel.component.smpp.SmppMessageType; +import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.SupervisingRouteController; +import org.apache.camel.support.SimpleEventNotifierSupport; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit5.CamelTestSupport; import org.jsmpp.examples.SMPPServerSimulator; @@ -41,6 +45,7 @@ import org.junit.jupiter.api.parallel.Isolated; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @Isolated class SmppTRXProducerSupervisingRouteControllerIT extends CamelTestSupport { @@ -63,6 +68,8 @@ class SmppTRXProducerSupervisingRouteControllerIT extends CamelTestSupport { @EndpointInject("direct:start") private Endpoint start; + private List<CamelEvent.RouteRestartingEvent> events; + @Override protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); @@ -73,6 +80,16 @@ class SmppTRXProducerSupervisingRouteControllerIT extends CamelTestSupport { src.setInitialDelay(100); src.setThreadPoolSize(2); + events = new ArrayList<>(); + context.getManagementStrategy().addEventNotifier(new SimpleEventNotifierSupport() { + @Override + public void notify(CamelEvent event) throws Exception { + if (event instanceof CamelEvent.RouteRestartingEvent rre) { + events.add(rre); + } + } + }); + return context; } @@ -109,6 +126,9 @@ class SmppTRXProducerSupervisingRouteControllerIT extends CamelTestSupport { assertNotNull(exchange.getIn().getHeader(SmppConstants.ID)); assertEquals(1, exchange.getIn().getHeader(SmppConstants.SENT_MESSAGE_COUNT)); + + // there should be some restart events + assertTrue(events.size() >= 3, "There should be restarting events, size: " + events.size()); } @Override diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java index d890a345f5c..301dcbde50e 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java @@ -64,6 +64,8 @@ public interface CamelEvent { RouteStarted, RouteStopping, RouteStopped, + RouteRestarting, + RouteRestartingFailure, ServiceStartupFailure, ServiceStopFailure, StepStarted, @@ -426,6 +428,20 @@ public interface CamelEvent { } } + interface RouteRestartingEvent extends RouteEvent { + @Override + default Type getType() { + return Type.RouteRestarting; + } + } + + interface RouteRestartingFailureEvent extends RouteEvent, FailureEvent { + @Override + default Type getType() { + return Type.RouteRestartingFailure; + } + } + interface ServiceEvent extends CamelEvent { Object getService(); diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java index d390f2db564..b7412a218b6 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java @@ -243,6 +243,27 @@ public interface EventFactory { */ CamelEvent createRouteReloaded(Route route, int index, int total); + /** + * Creates an {@link CamelEvent} for {@link Route} being restarted by {@link SupervisingRouteController}. + * + * @param route the route + * @param attempt the attempt number for restarting the route + * @return the restarting event + */ + CamelEvent createRouteRestarting(Route route, long attempt); + + /** + * Creates an {@link CamelEvent} for {@link Route} being restarted and failed by {@link SupervisingRouteController}. + * + * @param route the route + * @param attempt the attempt number for restarting the route + * @param cause the exception causing the failure + * @param exhausted whether the supervising controller is exhausted and will not attempt to restart this route + * anymore + * @return the restarting failure event + */ + CamelEvent createRouteRestartingFailure(Route route, long attempt, Throwable cause, boolean exhausted); + /** * Creates an {@link CamelEvent} when an {@link org.apache.camel.Exchange} has been created * diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java index 6d8e06976af..5862eaf0db4 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java @@ -50,6 +50,7 @@ import org.apache.camel.spi.RouteError; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.spi.SupervisingRouteController; +import org.apache.camel.support.EventHelper; import org.apache.camel.support.PatternHelper; import org.apache.camel.support.RoutePolicySupport; import org.apache.camel.util.ObjectHelper; @@ -672,6 +673,7 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im try { logger.info("Restarting route: {} attempt: {}", r.getId(), attempt); + EventHelper.notifyRouteRestarting(getCamelContext(), r.get(), attempt); doStartRoute(r, false, rx -> DefaultSupervisingRouteController.super.startRoute(rx.getId())); logger.info("Route: {} started after {} attempts", r.getId(), attempt); return false; @@ -681,6 +683,7 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im logger.info("Failed restarting route: {} attempt: {} due: {} (stacktrace in debug log level)", r.getId(), attempt, cause); logger.debug(" Error restarting route caused by: {}", e.getMessage(), e); + EventHelper.notifyRouteRestartingFailure(getCamelContext(), r.get(), attempt, e, false); return true; } }); @@ -698,17 +701,21 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im if (backOffTask != null && backOffTask.getStatus() == BackOffTimer.Task.Status.Exhausted && stopped) { + long attempts = backOffTask.getCurrentAttempts() - 1; LOG.warn( "Restarting route: {} is exhausted after {} attempts. No more attempts will be made" + " and the route is no longer supervised by this route controller and remains as stopped.", - route.getId(), backOffTask.getCurrentAttempts() - 1); + route.getId(), attempts); r.get().setRouteController(null); // remember exhausted routes routeManager.exhausted.put(r, task); + // store as last error on route as it was exhausted + Throwable t = getRestartException(route.getId()); + EventHelper.notifyRouteRestartingFailure(getCamelContext(), r.get(), attempts, t, true); + if (unhealthyOnExhausted) { // store as last error on route as it was exhausted - Throwable t = getRestartException(route.getId()); if (t != null) { DefaultRouteError.set(getCamelContext(), r.getId(), RouteError.Phase.START, t, true); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java index 2b53623566e..4c1695d8421 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java @@ -213,6 +213,24 @@ public class DefaultEventFactory implements EventFactory { return answer; } + @Override + public CamelEvent createRouteRestarting(Route route, long attempt) { + CamelEvent answer = new RouteRestartingEvent(route, attempt); + if (timestampEnabled) { + answer.setTimestamp(System.currentTimeMillis()); + } + return answer; + } + + @Override + public CamelEvent createRouteRestartingFailure(Route route, long attempt, Throwable cause, boolean exhausted) { + CamelEvent answer = new RouteRestartingFailureEvent(route, attempt, cause, exhausted); + if (timestampEnabled) { + answer.setTimestamp(System.currentTimeMillis()); + } + return answer; + } + @Override public CamelEvent createRouteStoppingEvent(Route route) { CamelEvent answer = new RouteStoppingEvent(route); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java new file mode 100644 index 00000000000..8e896f4b75c --- /dev/null +++ b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.event; + +import java.io.Serial; + +import org.apache.camel.Route; +import org.apache.camel.spi.CamelEvent; + +public class RouteRestartingEvent extends AbstractRouteEvent implements CamelEvent.RouteRestartingEvent { + private static final @Serial long serialVersionUID = 1330257282431407330L; + private final long attempt; + + public RouteRestartingEvent(Route source, long attempt) { + super(source); + this.attempt = attempt; + } + + public long getAttempt() { + return attempt; + } + + @Override + public String toString() { + return "Route restarting: " + getRoute().getId() + " (attempt: " + attempt + ")"; + } +} diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java new file mode 100644 index 00000000000..cb3e934a46f --- /dev/null +++ b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.event; + +import java.io.Serial; + +import org.apache.camel.Route; +import org.apache.camel.spi.CamelEvent; + +public class RouteRestartingFailureEvent extends AbstractRouteEvent implements CamelEvent.RouteRestartingFailureEvent { + private static final @Serial long serialVersionUID = 1330257282431407331L; + private final long attempt; + private final Throwable cause; + private final boolean exhausted; + + public RouteRestartingFailureEvent(Route source, long attempt, Throwable cause, boolean exhausted) { + super(source); + this.attempt = attempt; + this.cause = cause; + this.exhausted = exhausted; + } + + public long getAttempt() { + return attempt; + } + + @Override + public Throwable getCause() { + return cause; + } + + public boolean isExhausted() { + return exhausted; + } + + @Override + public String toString() { + return "Route restarting " + (exhausted ? "exhausted: " : "failed: ") + getRoute().getId() + " (attempt: " + attempt + + ") due to " + cause.getMessage(); + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java index b2fe6490b8e..9e113a6edef 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.impl.engine; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -28,7 +30,10 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.seda.SedaComponent; import org.apache.camel.component.seda.SedaConsumer; import org.apache.camel.component.seda.SedaEndpoint; +import org.apache.camel.impl.event.RouteRestartingEvent; +import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.SupervisingRouteController; +import org.apache.camel.support.SimpleEventNotifierSupport; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; @@ -96,6 +101,17 @@ public class DefaultSupervisingRouteControllerTest extends ContextTestSupport { src.setInitialDelay(100); src.setThreadPoolSize(2); + List<CamelEvent.RouteRestartingEvent> events = new ArrayList<>(); + + context.getManagementStrategy().addEventNotifier(new SimpleEventNotifierSupport() { + @Override + public void notify(CamelEvent event) throws Exception { + if (event instanceof RouteRestartingEvent rre) { + events.add(rre); + } + } + }); + context.start(); MockEndpoint mock = context.getEndpoint("mock:foo", MockEndpoint.class); @@ -118,6 +134,9 @@ public class DefaultSupervisingRouteControllerTest extends ContextTestSupport { assertEquals("Started", context.getRouteController().getRouteStatus("cake").toString()); // bar is no auto startup assertEquals("Stopped", context.getRouteController().getRouteStatus("bar").toString()); + + // 2 x 5 restart attempts + assertEquals(10, events.size()); } private static class MyRoute extends RouteBuilder { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java index f694ef00563..6bc547ae6a4 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java @@ -266,6 +266,85 @@ public final class EventHelper { return answer; } + public static boolean notifyRouteRestarting(CamelContext context, Route route, long attempt) { + ManagementStrategy management = context.getManagementStrategy(); + if (management == null) { + return false; + } + + EventFactory factory = management.getEventFactory(); + if (factory == null) { + return false; + } + + List<EventNotifier> notifiers = management.getStartedEventNotifiers(); + if (notifiers == null || notifiers.isEmpty()) { + return false; + } + + boolean answer = false; + CamelEvent event = null; + for (EventNotifier notifier : notifiers) { + if (notifier.isDisabled()) { + continue; + } + if (notifier.isIgnoreRouteEvents()) { + continue; + } + + if (event == null) { + // only create event once + event = factory.createRouteRestarting(route, attempt); + if (event == null) { + // factory could not create event so exit + return false; + } + } + answer |= doNotifyEvent(notifier, event); + } + return answer; + } + + public static boolean notifyRouteRestartingFailure( + CamelContext context, Route route, long attempt, Throwable cause, boolean exhausted) { + ManagementStrategy management = context.getManagementStrategy(); + if (management == null) { + return false; + } + + EventFactory factory = management.getEventFactory(); + if (factory == null) { + return false; + } + + List<EventNotifier> notifiers = management.getStartedEventNotifiers(); + if (notifiers == null || notifiers.isEmpty()) { + return false; + } + + boolean answer = false; + CamelEvent event = null; + for (EventNotifier notifier : notifiers) { + if (notifier.isDisabled()) { + continue; + } + if (notifier.isIgnoreRouteEvents()) { + continue; + } + + if (event == null) { + // only create event once + event = factory.createRouteRestartingFailure(route, attempt, cause, exhausted); + if (event == null) { + // factory could not create event so exit + return false; + } + } + answer |= doNotifyEvent(notifier, event); + } + return answer; + } + public static boolean notifyRouteStarted(CamelContext context, Route route) { ManagementStrategy management = context.getManagementStrategy(); if (management == null) {