This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push: new 6aaa0718df7 CAMEL-20435: camel-core - Resequencer EIP cannot be started again after being stopped (#13164) 6aaa0718df7 is described below commit 6aaa0718df7e3cd428f349de4b95e5b72fc12375 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Feb 19 08:46:44 2024 +0100 CAMEL-20435: camel-core - Resequencer EIP cannot be started again after being stopped (#13164) --- .../org/apache/camel/processor/Resequencer.java | 15 ++++++++++----- .../apache/camel/processor/ResequencerTest.java | 22 +++++++++++++++++++++- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java index 1c2db9b6ca1..378b52cfb9f 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java @@ -84,8 +84,7 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce private final AsyncProcessor processor; private final Collection<Exchange> collection; private ExceptionHandler exceptionHandler; - - private final BatchSender sender; + private BatchSender sender; public Resequencer(CamelContext camelContext, Processor processor, Expression expression) { this(camelContext, processor, createSet(expression, false, false), expression); @@ -107,7 +106,6 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce this.processor = AsyncProcessorConverterHelper.convert(processor); this.collection = collection; this.expression = expression; - this.sender = new BatchSender(); this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); } @@ -340,14 +338,21 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce @Override protected void doStart() throws Exception { ServiceHelper.startService(processor); + sender = new BatchSender(); sender.start(); } @Override protected void doStop() throws Exception { - sender.cancel(); + if (sender != null) { + try { + sender.cancel(); + } catch (Exception e) { + // ignore + } + sender = null; + } ServiceHelper.stopService(processor); - collection.clear(); } /** diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java index 1951fd46544..3dfb03512c0 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java @@ -43,6 +43,25 @@ public class ResequencerTest extends ContextTestSupport { resultEndpoint.assertIsSatisfied(); } + @Test + public void testRestartRoute() throws Exception { + resultEndpoint.expectedBodiesReceived("Guillaume", "Hiram", "James", "Rob"); + sendBodies("direct:start", "Rob", "Hiram", "Guillaume", "James"); + resultEndpoint.assertIsSatisfied(); + + context.getRouteController().stopRoute("myRoute"); + + // wait just a little bit + Thread.sleep(5); + resultEndpoint.reset(); + + context.getRouteController().startRoute("myRoute"); + + resultEndpoint.expectedBodiesReceived("Donald", "Goofy", "Jack"); + sendBodies("direct:start", "Jack", "Donald", "Goofy"); + resultEndpoint.assertIsSatisfied(); + } + @Override @BeforeEach public void setUp() throws Exception { @@ -67,7 +86,8 @@ public class ResequencerTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { // START SNIPPET: example - from("direct:start").resequence().body().timeout(50).to("mock:result"); + from("direct:start").routeId("myRoute") + .resequence().body().timeout(50).to("mock:result"); // END SNIPPET: example } };