This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.21.x by this push:
     new 6aaa0718df7 CAMEL-20435: camel-core - Resequencer EIP cannot be 
started again after being stopped (#13164)
6aaa0718df7 is described below

commit 6aaa0718df7e3cd428f349de4b95e5b72fc12375
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Feb 19 08:46:44 2024 +0100

    CAMEL-20435: camel-core - Resequencer EIP cannot be started again after 
being stopped (#13164)
---
 .../org/apache/camel/processor/Resequencer.java    | 15 ++++++++++-----
 .../apache/camel/processor/ResequencerTest.java    | 22 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
index 1c2db9b6ca1..378b52cfb9f 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -84,8 +84,7 @@ public class Resequencer extends AsyncProcessorSupport 
implements Navigate<Proce
     private final AsyncProcessor processor;
     private final Collection<Exchange> collection;
     private ExceptionHandler exceptionHandler;
-
-    private final BatchSender sender;
+    private BatchSender sender;
 
     public Resequencer(CamelContext camelContext, Processor processor, 
Expression expression) {
         this(camelContext, processor, createSet(expression, false, false), 
expression);
@@ -107,7 +106,6 @@ public class Resequencer extends AsyncProcessorSupport 
implements Navigate<Proce
         this.processor = AsyncProcessorConverterHelper.convert(processor);
         this.collection = collection;
         this.expression = expression;
-        this.sender = new BatchSender();
         this.exceptionHandler = new LoggingExceptionHandler(camelContext, 
getClass());
     }
 
@@ -340,14 +338,21 @@ public class Resequencer extends AsyncProcessorSupport 
implements Navigate<Proce
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(processor);
+        sender = new BatchSender();
         sender.start();
     }
 
     @Override
     protected void doStop() throws Exception {
-        sender.cancel();
+        if (sender != null) {
+            try {
+                sender.cancel();
+            } catch (Exception e) {
+                // ignore
+            }
+            sender = null;
+        }
         ServiceHelper.stopService(processor);
-        collection.clear();
     }
 
     /**
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java 
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
index 1951fd46544..3dfb03512c0 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
@@ -43,6 +43,25 @@ public class ResequencerTest extends ContextTestSupport {
         resultEndpoint.assertIsSatisfied();
     }
 
+    @Test
+    public void testRestartRoute() throws Exception {
+        resultEndpoint.expectedBodiesReceived("Guillaume", "Hiram", "James", 
"Rob");
+        sendBodies("direct:start", "Rob", "Hiram", "Guillaume", "James");
+        resultEndpoint.assertIsSatisfied();
+
+        context.getRouteController().stopRoute("myRoute");
+
+        // wait just a little bit
+        Thread.sleep(5);
+        resultEndpoint.reset();
+
+        context.getRouteController().startRoute("myRoute");
+
+        resultEndpoint.expectedBodiesReceived("Donald", "Goofy", "Jack");
+        sendBodies("direct:start", "Jack", "Donald", "Goofy");
+        resultEndpoint.assertIsSatisfied();
+    }
+
     @Override
     @BeforeEach
     public void setUp() throws Exception {
@@ -67,7 +86,8 @@ public class ResequencerTest extends ContextTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: example
-                
from("direct:start").resequence().body().timeout(50).to("mock:result");
+                from("direct:start").routeId("myRoute")
+                        .resequence().body().timeout(50).to("mock:result");
                 // END SNIPPET: example
             }
         };

Reply via email to