This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch res-event
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a9d2ce71bf1cb7a32b31c89b994781b4d801f18a
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Oct 6 11:11:51 2024 +0200

    CAMEL-21322: camel-core: Add 
RouteRestartingEvent/RouteRestartingFailureEvent to keep track of route during 
supervising startup of routes.
---
 ...mppTRXProducerSupervisingRouteControllerIT.java | 20 ++++++
 .../main/java/org/apache/camel/spi/CamelEvent.java | 16 +++++
 .../java/org/apache/camel/spi/EventFactory.java    | 21 ++++++
 .../engine/DefaultSupervisingRouteController.java  | 11 ++-
 .../camel/impl/event/DefaultEventFactory.java      | 18 +++++
 .../camel/impl/event/RouteRestartingEvent.java     | 41 +++++++++++
 .../impl/event/RouteRestartingFailureEvent.java    | 55 +++++++++++++++
 .../DefaultSupervisingRouteControllerTest.java     | 19 ++++++
 .../java/org/apache/camel/support/EventHelper.java | 79 ++++++++++++++++++++++
 9 files changed, 278 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java
 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java
index 08f78639b7b..74092f2f11a 100644
--- 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java
+++ 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.smpp.integration;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -29,7 +31,9 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.smpp.SmppConstants;
 import org.apache.camel.component.smpp.SmppMessageType;
+import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.support.SimpleEventNotifierSupport;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.jsmpp.examples.SMPPServerSimulator;
@@ -41,6 +45,7 @@ import org.junit.jupiter.api.parallel.Isolated;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Isolated
 class SmppTRXProducerSupervisingRouteControllerIT extends CamelTestSupport {
@@ -63,6 +68,8 @@ class SmppTRXProducerSupervisingRouteControllerIT extends 
CamelTestSupport {
     @EndpointInject("direct:start")
     private Endpoint start;
 
+    private List<CamelEvent.RouteRestartingEvent> events;
+
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
@@ -73,6 +80,16 @@ class SmppTRXProducerSupervisingRouteControllerIT extends 
CamelTestSupport {
         src.setInitialDelay(100);
         src.setThreadPoolSize(2);
 
+        events = new ArrayList<>();
+        context.getManagementStrategy().addEventNotifier(new 
SimpleEventNotifierSupport() {
+            @Override
+            public void notify(CamelEvent event) throws Exception {
+                if (event instanceof CamelEvent.RouteRestartingEvent rre) {
+                    events.add(rre);
+                }
+            }
+        });
+
         return context;
     }
 
@@ -109,6 +126,9 @@ class SmppTRXProducerSupervisingRouteControllerIT extends 
CamelTestSupport {
 
         assertNotNull(exchange.getIn().getHeader(SmppConstants.ID));
         assertEquals(1, 
exchange.getIn().getHeader(SmppConstants.SENT_MESSAGE_COUNT));
+
+        // there should be some restart events
+        assertTrue(events.size() >= 3, "There should be restarting events, 
size: " + events.size());
     }
 
     @Override
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
index d890a345f5c..301dcbde50e 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
@@ -64,6 +64,8 @@ public interface CamelEvent {
         RouteStarted,
         RouteStopping,
         RouteStopped,
+        RouteRestarting,
+        RouteRestartingFailure,
         ServiceStartupFailure,
         ServiceStopFailure,
         StepStarted,
@@ -426,6 +428,20 @@ public interface CamelEvent {
         }
     }
 
+    interface RouteRestartingEvent extends RouteEvent {
+        @Override
+        default Type getType() {
+            return Type.RouteRestarting;
+        }
+    }
+
+    interface RouteRestartingFailureEvent extends RouteEvent, FailureEvent {
+        @Override
+        default Type getType() {
+            return Type.RouteRestartingFailure;
+        }
+    }
+
     interface ServiceEvent extends CamelEvent {
 
         Object getService();
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
index d390f2db564..b7412a218b6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
@@ -243,6 +243,27 @@ public interface EventFactory {
      */
     CamelEvent createRouteReloaded(Route route, int index, int total);
 
+    /**
+     * Creates an {@link CamelEvent} for {@link Route} being restarted by 
{@link SupervisingRouteController}.
+     *
+     * @param  route   the route
+     * @param  attempt the attempt number for restarting the route
+     * @return         the restarting event
+     */
+    CamelEvent createRouteRestarting(Route route, long attempt);
+
+    /**
+     * Creates an {@link CamelEvent} for {@link Route} being restarted and 
failed by {@link SupervisingRouteController}.
+     *
+     * @param  route     the route
+     * @param  attempt   the attempt number for restarting the route
+     * @param  cause     the exception causing the failure
+     * @param  exhausted whether the supervising controller is exhausted and 
will not attempt to restart this route
+     *                   anymore
+     * @return           the restarting failure event
+     */
+    CamelEvent createRouteRestartingFailure(Route route, long attempt, 
Throwable cause, boolean exhausted);
+
     /**
      * Creates an {@link CamelEvent} when an {@link org.apache.camel.Exchange} 
has been created
      *
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
index 6d8e06976af..5862eaf0db4 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
@@ -50,6 +50,7 @@ import org.apache.camel.spi.RouteError;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.RoutePolicyFactory;
 import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.RoutePolicySupport;
 import org.apache.camel.util.ObjectHelper;
@@ -672,6 +673,7 @@ public class DefaultSupervisingRouteController extends 
DefaultRouteController im
 
                             try {
                                 logger.info("Restarting route: {} attempt: 
{}", r.getId(), attempt);
+                                
EventHelper.notifyRouteRestarting(getCamelContext(), r.get(), attempt);
                                 doStartRoute(r, false, rx -> 
DefaultSupervisingRouteController.super.startRoute(rx.getId()));
                                 logger.info("Route: {} started after {} 
attempts", r.getId(), attempt);
                                 return false;
@@ -681,6 +683,7 @@ public class DefaultSupervisingRouteController extends 
DefaultRouteController im
                                 logger.info("Failed restarting route: {} 
attempt: {} due: {} (stacktrace in debug log level)",
                                         r.getId(), attempt, cause);
                                 logger.debug("    Error restarting route 
caused by: {}", e.getMessage(), e);
+                                
EventHelper.notifyRouteRestartingFailure(getCamelContext(), r.get(), attempt, 
e, false);
                                 return true;
                             }
                         });
@@ -698,17 +701,21 @@ public class DefaultSupervisingRouteController extends 
DefaultRouteController im
 
                                     if (backOffTask != null && 
backOffTask.getStatus() == BackOffTimer.Task.Status.Exhausted
                                             && stopped) {
+                                        long attempts = 
backOffTask.getCurrentAttempts() - 1;
                                         LOG.warn(
                                                 "Restarting route: {} is 
exhausted after {} attempts. No more attempts will be made"
                                                  + " and the route is no 
longer supervised by this route controller and remains as stopped.",
-                                                route.getId(), 
backOffTask.getCurrentAttempts() - 1);
+                                                route.getId(), attempts);
                                         r.get().setRouteController(null);
                                         // remember exhausted routes
                                         routeManager.exhausted.put(r, task);
 
+                                        // store as last error on route as it 
was exhausted
+                                        Throwable t = 
getRestartException(route.getId());
+                                        
EventHelper.notifyRouteRestartingFailure(getCamelContext(), r.get(), attempts, 
t, true);
+
                                         if (unhealthyOnExhausted) {
                                             // store as last error on route as 
it was exhausted
-                                            Throwable t = 
getRestartException(route.getId());
                                             if (t != null) {
                                                 
DefaultRouteError.set(getCamelContext(), r.getId(), RouteError.Phase.START, t,
                                                         true);
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
index 2b53623566e..4c1695d8421 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
@@ -213,6 +213,24 @@ public class DefaultEventFactory implements EventFactory {
         return answer;
     }
 
+    @Override
+    public CamelEvent createRouteRestarting(Route route, long attempt) {
+        CamelEvent answer = new RouteRestartingEvent(route, attempt);
+        if (timestampEnabled) {
+            answer.setTimestamp(System.currentTimeMillis());
+        }
+        return answer;
+    }
+
+    @Override
+    public CamelEvent createRouteRestartingFailure(Route route, long attempt, 
Throwable cause, boolean exhausted) {
+        CamelEvent answer = new RouteRestartingFailureEvent(route, attempt, 
cause, exhausted);
+        if (timestampEnabled) {
+            answer.setTimestamp(System.currentTimeMillis());
+        }
+        return answer;
+    }
+
     @Override
     public CamelEvent createRouteStoppingEvent(Route route) {
         CamelEvent answer = new RouteStoppingEvent(route);
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java
new file mode 100644
index 00000000000..8e896f4b75c
--- /dev/null
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.event;
+
+import java.io.Serial;
+
+import org.apache.camel.Route;
+import org.apache.camel.spi.CamelEvent;
+
+public class RouteRestartingEvent extends AbstractRouteEvent implements 
CamelEvent.RouteRestartingEvent {
+    private static final @Serial long serialVersionUID = 1330257282431407330L;
+    private final long attempt;
+
+    public RouteRestartingEvent(Route source, long attempt) {
+        super(source);
+        this.attempt = attempt;
+    }
+
+    public long getAttempt() {
+        return attempt;
+    }
+
+    @Override
+    public String toString() {
+        return "Route restarting: " + getRoute().getId() + " (attempt: " + 
attempt + ")";
+    }
+}
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java
new file mode 100644
index 00000000000..cb3e934a46f
--- /dev/null
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.event;
+
+import java.io.Serial;
+
+import org.apache.camel.Route;
+import org.apache.camel.spi.CamelEvent;
+
+public class RouteRestartingFailureEvent extends AbstractRouteEvent implements 
CamelEvent.RouteRestartingFailureEvent {
+    private static final @Serial long serialVersionUID = 1330257282431407331L;
+    private final long attempt;
+    private final Throwable cause;
+    private final boolean exhausted;
+
+    public RouteRestartingFailureEvent(Route source, long attempt, Throwable 
cause, boolean exhausted) {
+        super(source);
+        this.attempt = attempt;
+        this.cause = cause;
+        this.exhausted = exhausted;
+    }
+
+    public long getAttempt() {
+        return attempt;
+    }
+
+    @Override
+    public Throwable getCause() {
+        return cause;
+    }
+
+    public boolean isExhausted() {
+        return exhausted;
+    }
+
+    @Override
+    public String toString() {
+        return "Route restarting " + (exhausted ? "exhausted: " : "failed: ") 
+ getRoute().getId() + " (attempt: " + attempt
+               + ") due to " + cause.getMessage();
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java
index b2fe6490b8e..9e113a6edef 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.impl.engine;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -28,7 +30,10 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.seda.SedaComponent;
 import org.apache.camel.component.seda.SedaConsumer;
 import org.apache.camel.component.seda.SedaEndpoint;
+import org.apache.camel.impl.event.RouteRestartingEvent;
+import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.support.SimpleEventNotifierSupport;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.*;
@@ -96,6 +101,17 @@ public class DefaultSupervisingRouteControllerTest extends 
ContextTestSupport {
         src.setInitialDelay(100);
         src.setThreadPoolSize(2);
 
+        List<CamelEvent.RouteRestartingEvent> events = new ArrayList<>();
+
+        context.getManagementStrategy().addEventNotifier(new 
SimpleEventNotifierSupport() {
+            @Override
+            public void notify(CamelEvent event) throws Exception {
+                if (event instanceof RouteRestartingEvent rre) {
+                    events.add(rre);
+                }
+            }
+        });
+
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:foo", 
MockEndpoint.class);
@@ -118,6 +134,9 @@ public class DefaultSupervisingRouteControllerTest extends 
ContextTestSupport {
         assertEquals("Started", 
context.getRouteController().getRouteStatus("cake").toString());
         // bar is no auto startup
         assertEquals("Stopped", 
context.getRouteController().getRouteStatus("bar").toString());
+
+        // 2 x 5 restart attempts
+        assertEquals(10, events.size());
     }
 
     private static class MyRoute extends RouteBuilder {
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java 
b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
index f694ef00563..6bc547ae6a4 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
@@ -266,6 +266,85 @@ public final class EventHelper {
         return answer;
     }
 
+    public static boolean notifyRouteRestarting(CamelContext context, Route 
route, long attempt) {
+        ManagementStrategy management = context.getManagementStrategy();
+        if (management == null) {
+            return false;
+        }
+
+        EventFactory factory = management.getEventFactory();
+        if (factory == null) {
+            return false;
+        }
+
+        List<EventNotifier> notifiers = management.getStartedEventNotifiers();
+        if (notifiers == null || notifiers.isEmpty()) {
+            return false;
+        }
+
+        boolean answer = false;
+        CamelEvent event = null;
+        for (EventNotifier notifier : notifiers) {
+            if (notifier.isDisabled()) {
+                continue;
+            }
+            if (notifier.isIgnoreRouteEvents()) {
+                continue;
+            }
+
+            if (event == null) {
+                // only create event once
+                event = factory.createRouteRestarting(route, attempt);
+                if (event == null) {
+                    // factory could not create event so exit
+                    return false;
+                }
+            }
+            answer |= doNotifyEvent(notifier, event);
+        }
+        return answer;
+    }
+
+    public static boolean notifyRouteRestartingFailure(
+            CamelContext context, Route route, long attempt, Throwable cause, 
boolean exhausted) {
+        ManagementStrategy management = context.getManagementStrategy();
+        if (management == null) {
+            return false;
+        }
+
+        EventFactory factory = management.getEventFactory();
+        if (factory == null) {
+            return false;
+        }
+
+        List<EventNotifier> notifiers = management.getStartedEventNotifiers();
+        if (notifiers == null || notifiers.isEmpty()) {
+            return false;
+        }
+
+        boolean answer = false;
+        CamelEvent event = null;
+        for (EventNotifier notifier : notifiers) {
+            if (notifier.isDisabled()) {
+                continue;
+            }
+            if (notifier.isIgnoreRouteEvents()) {
+                continue;
+            }
+
+            if (event == null) {
+                // only create event once
+                event = factory.createRouteRestartingFailure(route, attempt, 
cause, exhausted);
+                if (event == null) {
+                    // factory could not create event so exit
+                    return false;
+                }
+            }
+            answer |= doNotifyEvent(notifier, event);
+        }
+        return answer;
+    }
+
     public static boolean notifyRouteStarted(CamelContext context, Route 
route) {
         ManagementStrategy management = context.getManagementStrategy();
         if (management == null) {

Reply via email to