This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new ba832e21763 CAMEL-20435: camel-core - Resequencer EIP cannot be started again after being stopped (#13164) ba832e21763 is described below commit ba832e21763cb4938934876ad83dc232628b3549 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Feb 19 08:46:44 2024 +0100 CAMEL-20435: camel-core - Resequencer EIP cannot be started again after being stopped (#13164) --- .../org/apache/camel/processor/Resequencer.java | 15 ++++++++++----- .../apache/camel/processor/ResequencerTest.java | 22 +++++++++++++++++++++- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java index 68585f92c50..37f25e92ca2 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java @@ -84,8 +84,7 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce private final AsyncProcessor processor; private final Collection<Exchange> collection; private ExceptionHandler exceptionHandler; - - private final BatchSender sender; + private BatchSender sender; public Resequencer(CamelContext camelContext, Processor processor, Expression expression) { this(camelContext, processor, createSet(expression, false, false), expression); @@ -107,7 +106,6 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce this.processor = AsyncProcessorConverterHelper.convert(processor); this.collection = collection; this.expression = expression; - this.sender = new BatchSender(); this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); } @@ -340,14 +338,21 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce @Override protected void doStart() throws Exception { ServiceHelper.startService(processor); + sender = new BatchSender(); sender.start(); } @Override protected void doStop() throws Exception { - sender.cancel(); + if (sender != null) { + try { + sender.cancel(); + } catch (Exception e) { + // ignore + } + sender = null; + } ServiceHelper.stopService(processor); - collection.clear(); } /** diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java index 1951fd46544..3dfb03512c0 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java @@ -43,6 +43,25 @@ public class ResequencerTest extends ContextTestSupport { resultEndpoint.assertIsSatisfied(); } + @Test + public void testRestartRoute() throws Exception { + resultEndpoint.expectedBodiesReceived("Guillaume", "Hiram", "James", "Rob"); + sendBodies("direct:start", "Rob", "Hiram", "Guillaume", "James"); + resultEndpoint.assertIsSatisfied(); + + context.getRouteController().stopRoute("myRoute"); + + // wait just a little bit + Thread.sleep(5); + resultEndpoint.reset(); + + context.getRouteController().startRoute("myRoute"); + + resultEndpoint.expectedBodiesReceived("Donald", "Goofy", "Jack"); + sendBodies("direct:start", "Jack", "Donald", "Goofy"); + resultEndpoint.assertIsSatisfied(); + } + @Override @BeforeEach public void setUp() throws Exception { @@ -67,7 +86,8 @@ public class ResequencerTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { // START SNIPPET: example - from("direct:start").resequence().body().timeout(50).to("mock:result"); + from("direct:start").routeId("myRoute") + .resequence().body().timeout(50).to("mock:result"); // END SNIPPET: example } };