This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ae603ebb9b9 CAMEL-18186: camel-saga - InMemorySagaCoordinator should 
reuse existi… (#14678)
ae603ebb9b9 is described below

commit ae603ebb9b916bd4fb1562ea794437bc133e0f02
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Jun 28 16:15:44 2024 +0200

    CAMEL-18186: camel-saga - InMemorySagaCoordinator should reuse existi… 
(#14678)
    
    * CAMEL-18186: camel-saga - InMemorySagaCoordinator should reuse existing 
exchange so for example distributed tracing spans are the same and not an 
entire new span.
---
 .../org/apache/camel/catalog/others/lra.json       |   2 +-
 components/camel-lra/pom.xml                       |   2 +-
 .../camel-lra/src/generated/resources/lra.json     |   2 +-
 .../camel/service/lra/LRAManualInMemoryTest.java   | 106 +++++++++++++++++++++
 .../apache/camel/saga/InMemorySagaCoordinator.java |  55 ++++++-----
 5 files changed, 140 insertions(+), 27 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/lra.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/lra.json
index 75d15794854..3f1dc29d11f 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/lra.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/lra.json
@@ -6,7 +6,7 @@
     "description": "Camel saga binding for Long-Running-Action framework",
     "deprecated": false,
     "firstVersion": "2.21.0",
-    "label": "database, microservice",
+    "label": "microservice",
     "supportLevel": "Preview",
     "groupId": "org.apache.camel",
     "artifactId": "camel-lra",
diff --git a/components/camel-lra/pom.xml b/components/camel-lra/pom.xml
index 51f4e626838..32e715d168f 100644
--- a/components/camel-lra/pom.xml
+++ b/components/camel-lra/pom.xml
@@ -35,7 +35,7 @@
         <!-- used by camel-catalog -->
         <firstVersion>2.21.0</firstVersion>
         <title>LRA</title>
-        <label>database, microservice</label>
+        <label>microservice</label>
         <supportLevel>preview</supportLevel>
 
         <!-- LRA coordinator container is only available for x86 -->
diff --git a/components/camel-lra/src/generated/resources/lra.json 
b/components/camel-lra/src/generated/resources/lra.json
index 75d15794854..3f1dc29d11f 100644
--- a/components/camel-lra/src/generated/resources/lra.json
+++ b/components/camel-lra/src/generated/resources/lra.json
@@ -6,7 +6,7 @@
     "description": "Camel saga binding for Long-Running-Action framework",
     "deprecated": false,
     "firstVersion": "2.21.0",
-    "label": "database, microservice",
+    "label": "microservice",
     "supportLevel": "Preview",
     "groupId": "org.apache.camel",
     "artifactId": "camel-lra",
diff --git 
a/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAManualInMemoryTest.java
 
b/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAManualInMemoryTest.java
new file mode 100644
index 00000000000..a9eae4170d6
--- /dev/null
+++ 
b/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAManualInMemoryTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.service.lra;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.model.SagaCompletionMode;
+import org.apache.camel.saga.InMemorySagaService;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class LRAManualInMemoryTest extends CamelTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        InMemorySagaService ss = new InMemorySagaService();
+        context.addService(ss);
+
+        return context;
+    }
+
+    @Test
+    public void testCompletion() throws InterruptedException {
+        MockEndpoint completeEndpoint = getMockEndpoint("mock:complete");
+        completeEndpoint.expectedMessageCount(1);
+        completeEndpoint.expectedHeaderReceived("id", "1");
+
+        sendBody("direct:saga", "hello", Collections.singletonMap("myid", 
"1"));
+
+        completeEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testFailure() throws InterruptedException {
+        MockEndpoint compensateEndpoint = getMockEndpoint("mock:compensate");
+        compensateEndpoint.expectedMessageCount(1);
+
+        sendBody("direct:saga", "fail");
+
+        compensateEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testTimeout() throws InterruptedException {
+        MockEndpoint compensateEndpoint = getMockEndpoint("mock:compensate");
+        compensateEndpoint.expectedMessageCount(1);
+
+        sendBody("direct:saga", "timeout");
+
+        compensateEndpoint.assertIsSatisfied();
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+
+                from("direct:saga")
+                        .saga()
+                        .completionMode(SagaCompletionMode.MANUAL)
+                        .timeout(1, TimeUnit.SECONDS)
+                        .option("id", header("myid"))
+                        .completion("direct:complete")
+                        .compensation("direct:compensate")
+                        .to("mock:endpoint")
+                        .choice()
+                        .when(body().isEqualTo("fail"))
+                        .to("saga:compensate")
+                        .when(body().isNotEqualTo("timeout"))
+                        .to("saga:complete")
+                        .end();
+
+                from("direct:complete")
+                        .log("YES!")
+                        .to("mock:complete");
+
+                from("direct:compensate")
+                        .log("NO :(")
+                        .to("mock:compensate");
+
+            }
+        };
+    }
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
 
b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
index 472d21ad975..7cb85441216 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
@@ -18,6 +18,7 @@ package org.apache.camel.saga;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -38,7 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A in-memory implementation of a saga coordinator.
+ * An in-memory implementation of a saga coordinator.
  */
 public class InMemorySagaCoordinator implements CamelSagaCoordinator {
 
@@ -86,15 +87,20 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
 
         if (!step.getOptions().isEmpty()) {
             optionValues.putIfAbsent(step, new ConcurrentHashMap<>());
-            Map<String, Object> values = optionValues.get(step);
+            Map<String, Object> values = optionValues.computeIfAbsent(step, k 
-> new HashMap<>());
             for (String option : step.getOptions().keySet()) {
                 Expression expression = step.getOptions().get(option);
-                try {
-                    values.put(option, expression.evaluate(exchange, 
Object.class));
-                } catch (Exception ex) {
-                    return CompletableFuture.supplyAsync(() -> {
-                        throw new RuntimeCamelException("Cannot evaluate saga 
option '" + option + "'", ex);
-                    });
+                if (expression != null) {
+                    try {
+                        Object value = expression.evaluate(exchange, 
Object.class);
+                        if (value != null) {
+                            values.put(option, value);
+                        }
+                    } catch (Exception ex) {
+                        return CompletableFuture.supplyAsync(() -> {
+                            throw new RuntimeCamelException("Cannot evaluate 
saga option '" + option + "'", ex);
+                        });
+                    }
                 }
             }
         }
@@ -103,7 +109,7 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
             sagaService.getExecutorService().schedule(() -> {
                 boolean doAction = currentStatus.compareAndSet(Status.RUNNING, 
Status.COMPENSATING);
                 if (doAction) {
-                    doCompensate();
+                    doCompensate(exchange);
                 }
             }, step.getTimeoutInMilliseconds().get(), TimeUnit.MILLISECONDS);
         }
@@ -116,7 +122,7 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
         boolean doAction = currentStatus.compareAndSet(Status.RUNNING, 
Status.COMPENSATING);
 
         if (doAction) {
-            doCompensate();
+            doCompensate(exchange);
         } else {
             Status status = currentStatus.get();
             if (status != Status.COMPENSATING && status != Status.COMPENSATED) 
{
@@ -134,7 +140,7 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
         boolean doAction = currentStatus.compareAndSet(Status.RUNNING, 
Status.COMPLETING);
 
         if (doAction) {
-            doComplete();
+            doComplete(exchange);
         } else {
             Status status = currentStatus.get();
             if (status != Status.COMPLETING && status != Status.COMPLETED) {
@@ -147,16 +153,16 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
         return CompletableFuture.completedFuture(null);
     }
 
-    public CompletableFuture<Boolean> doCompensate() {
-        return doFinalize(CamelSagaStep::getCompensation, "compensation")
+    public CompletableFuture<Boolean> doCompensate(final Exchange exchange) {
+        return doFinalize(exchange, CamelSagaStep::getCompensation, 
"compensation")
                 .thenApply(res -> {
                     currentStatus.set(Status.COMPENSATED);
                     return res;
                 });
     }
 
-    public CompletableFuture<Boolean> doComplete() {
-        return doFinalize(CamelSagaStep::getCompletion, "completion")
+    public CompletableFuture<Boolean> doComplete(final Exchange exchange) {
+        return doFinalize(exchange, CamelSagaStep::getCompletion, "completion")
                 .thenApply(res -> {
                     currentStatus.set(Status.COMPLETED);
                     return res;
@@ -164,13 +170,15 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
     }
 
     public CompletableFuture<Boolean> doFinalize(
+            final Exchange exchange,
             Function<CamelSagaStep, Optional<Endpoint>> endpointExtractor, 
String description) {
         CompletableFuture<Boolean> result = 
CompletableFuture.completedFuture(true);
         for (CamelSagaStep step : reversed(steps)) {
             Optional<Endpoint> endpoint = endpointExtractor.apply(step);
             if (endpoint.isPresent()) {
                 result = result.thenCompose(
-                        prevResult -> doFinalize(endpoint.get(), step, 0, 
description).thenApply(res -> prevResult && res));
+                        prevResult -> doFinalize(exchange, endpoint.get(), 
step, 0, description)
+                                .thenApply(res -> prevResult && res));
             }
         }
         return result.whenComplete((done, ex) -> {
@@ -182,8 +190,9 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
         });
     }
 
-    private CompletableFuture<Boolean> doFinalize(Endpoint endpoint, 
CamelSagaStep step, int doneAttempts, String description) {
-        Exchange exchange = createExchange(endpoint, step);
+    private CompletableFuture<Boolean> doFinalize(
+            Exchange exchange, Endpoint endpoint, CamelSagaStep step, int 
doneAttempts, String description) {
+        populateExchange(exchange, step);
 
         return CompletableFuture.supplyAsync(() -> {
             Exchange res = 
camelContext.createFluentProducerTemplate().to(endpoint).withExchange(exchange).send();
@@ -205,7 +214,7 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
             } else {
                 CompletableFuture<Boolean> future = new CompletableFuture<>();
                 sagaService.getExecutorService().schedule(() -> {
-                    doFinalize(endpoint, step, currentAttempt, 
description).whenComplete((res, ex) -> {
+                    doFinalize(exchange, endpoint, step, currentAttempt, 
description).whenComplete((res, ex) -> {
                         if (ex != null) {
                             future.completeExceptionally(ex);
                         } else {
@@ -218,17 +227,15 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
         });
     }
 
-    private Exchange createExchange(Endpoint endpoint, CamelSagaStep step) {
-        Exchange exchange = endpoint.createExchange();
-        exchange.getIn().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, getId());
+    private void populateExchange(Exchange exchange, CamelSagaStep step) {
+        exchange.getMessage().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, 
getId());
 
         Map<String, Object> values = optionValues.get(step);
         if (values != null) {
             for (Map.Entry<String, Object> entry : values.entrySet()) {
-                exchange.getIn().setHeader(entry.getKey(), entry.getValue());
+                exchange.getMessage().setHeader(entry.getKey(), 
entry.getValue());
             }
         }
-        return exchange;
     }
 
     private <T> List<T> reversed(List<T> list) {

Reply via email to