This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.0.x by this push:
new 2e8bd5ebcbd CAMEL-20435: camel-core - Resequencer EIP cannot be
started again after being stopped (#13164)
2e8bd5ebcbd is described below
commit 2e8bd5ebcbd057acbb58163baf0ae281a7480409
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Feb 19 08:46:44 2024 +0100
CAMEL-20435: camel-core - Resequencer EIP cannot be started again after
being stopped (#13164)
---
.../org/apache/camel/processor/Resequencer.java | 15 ++++++++++-----
.../apache/camel/processor/ResequencerTest.java | 22 +++++++++++++++++++++-
2 files changed, 31 insertions(+), 6 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
index 355fc7705d3..2ed72757826 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -84,8 +84,7 @@ public class Resequencer extends AsyncProcessorSupport
implements Navigate<Proce
private final AsyncProcessor processor;
private final Collection<Exchange> collection;
private ExceptionHandler exceptionHandler;
-
- private final BatchSender sender;
+ private BatchSender sender;
public Resequencer(CamelContext camelContext, Processor processor,
Expression expression) {
this(camelContext, processor, createSet(expression, false, false),
expression);
@@ -107,7 +106,6 @@ public class Resequencer extends AsyncProcessorSupport
implements Navigate<Proce
this.processor = AsyncProcessorConverterHelper.convert(processor);
this.collection = collection;
this.expression = expression;
- this.sender = new BatchSender();
this.exceptionHandler = new LoggingExceptionHandler(camelContext,
getClass());
}
@@ -340,14 +338,21 @@ public class Resequencer extends AsyncProcessorSupport
implements Navigate<Proce
@Override
protected void doStart() throws Exception {
ServiceHelper.startService(processor);
+ sender = new BatchSender();
sender.start();
}
@Override
protected void doStop() throws Exception {
- sender.cancel();
+ if (sender != null) {
+ try {
+ sender.cancel();
+ } catch (Exception e) {
+ // ignore
+ }
+ sender = null;
+ }
ServiceHelper.stopService(processor);
- collection.clear();
}
/**
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
index 1951fd46544..3dfb03512c0 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
@@ -43,6 +43,25 @@ public class ResequencerTest extends ContextTestSupport {
resultEndpoint.assertIsSatisfied();
}
+ @Test
+ public void testRestartRoute() throws Exception {
+ resultEndpoint.expectedBodiesReceived("Guillaume", "Hiram", "James",
"Rob");
+ sendBodies("direct:start", "Rob", "Hiram", "Guillaume", "James");
+ resultEndpoint.assertIsSatisfied();
+
+ context.getRouteController().stopRoute("myRoute");
+
+ // wait just a little bit
+ Thread.sleep(5);
+ resultEndpoint.reset();
+
+ context.getRouteController().startRoute("myRoute");
+
+ resultEndpoint.expectedBodiesReceived("Donald", "Goofy", "Jack");
+ sendBodies("direct:start", "Jack", "Donald", "Goofy");
+ resultEndpoint.assertIsSatisfied();
+ }
+
@Override
@BeforeEach
public void setUp() throws Exception {
@@ -67,7 +86,8 @@ public class ResequencerTest extends ContextTestSupport {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: example
-
from("direct:start").resequence().body().timeout(50).to("mock:result");
+ from("direct:start").routeId("myRoute")
+ .resequence().body().timeout(50).to("mock:result");
// END SNIPPET: example
}
};