This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch saga in repository https://gitbox.apache.org/repos/asf/camel.git
commit d3bfba9adc3bdac52ac1b0f303fd2597e191c77b Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jun 28 15:46:35 2024 +0200 CAMEL-18186: camel-saga - InMemorySagaCoordinator should reuse existing exchange so for example distributed tracing spans are the same and not an entire new span. --- .../org/apache/camel/catalog/others/lra.json | 2 +- components/camel-lra/pom.xml | 2 +- .../camel-lra/src/generated/resources/lra.json | 2 +- .../camel/service/lra/LRAManualInMemoryTest.java | 106 +++++++++++++++++++++ .../apache/camel/saga/InMemorySagaCoordinator.java | 54 ++++++----- 5 files changed, 138 insertions(+), 28 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/lra.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/lra.json index 75d15794854..3f1dc29d11f 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/lra.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/lra.json @@ -6,7 +6,7 @@ "description": "Camel saga binding for Long-Running-Action framework", "deprecated": false, "firstVersion": "2.21.0", - "label": "database, microservice", + "label": "microservice", "supportLevel": "Preview", "groupId": "org.apache.camel", "artifactId": "camel-lra", diff --git a/components/camel-lra/pom.xml b/components/camel-lra/pom.xml index 51f4e626838..32e715d168f 100644 --- a/components/camel-lra/pom.xml +++ b/components/camel-lra/pom.xml @@ -35,7 +35,7 @@ <!-- used by camel-catalog --> <firstVersion>2.21.0</firstVersion> <title>LRA</title> - <label>database, microservice</label> + <label>microservice</label> <supportLevel>preview</supportLevel> <!-- LRA coordinator container is only available for x86 --> diff --git a/components/camel-lra/src/generated/resources/lra.json b/components/camel-lra/src/generated/resources/lra.json index 75d15794854..3f1dc29d11f 100644 --- a/components/camel-lra/src/generated/resources/lra.json +++ b/components/camel-lra/src/generated/resources/lra.json @@ -6,7 +6,7 @@ "description": "Camel saga binding for Long-Running-Action framework", "deprecated": false, "firstVersion": "2.21.0", - "label": "database, microservice", + "label": "microservice", "supportLevel": "Preview", "groupId": "org.apache.camel", "artifactId": "camel-lra", diff --git a/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAManualInMemoryTest.java b/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAManualInMemoryTest.java new file mode 100644 index 00000000000..a9eae4170d6 --- /dev/null +++ b/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAManualInMemoryTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.service.lra; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.model.SagaCompletionMode; +import org.apache.camel.saga.InMemorySagaService; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class LRAManualInMemoryTest extends CamelTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + InMemorySagaService ss = new InMemorySagaService(); + context.addService(ss); + + return context; + } + + @Test + public void testCompletion() throws InterruptedException { + MockEndpoint completeEndpoint = getMockEndpoint("mock:complete"); + completeEndpoint.expectedMessageCount(1); + completeEndpoint.expectedHeaderReceived("id", "1"); + + sendBody("direct:saga", "hello", Collections.singletonMap("myid", "1")); + + completeEndpoint.assertIsSatisfied(); + } + + @Test + public void testFailure() throws InterruptedException { + MockEndpoint compensateEndpoint = getMockEndpoint("mock:compensate"); + compensateEndpoint.expectedMessageCount(1); + + sendBody("direct:saga", "fail"); + + compensateEndpoint.assertIsSatisfied(); + } + + @Test + public void testTimeout() throws InterruptedException { + MockEndpoint compensateEndpoint = getMockEndpoint("mock:compensate"); + compensateEndpoint.expectedMessageCount(1); + + sendBody("direct:saga", "timeout"); + + compensateEndpoint.assertIsSatisfied(); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + + from("direct:saga") + .saga() + .completionMode(SagaCompletionMode.MANUAL) + .timeout(1, TimeUnit.SECONDS) + .option("id", header("myid")) + .completion("direct:complete") + .compensation("direct:compensate") + .to("mock:endpoint") + .choice() + .when(body().isEqualTo("fail")) + .to("saga:compensate") + .when(body().isNotEqualTo("timeout")) + .to("saga:complete") + .end(); + + from("direct:complete") + .log("YES!") + .to("mock:complete"); + + from("direct:compensate") + .log("NO :(") + .to("mock:compensate"); + + } + }; + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java index 472d21ad975..51b6ab689c4 100644 --- a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java +++ b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java @@ -18,6 +18,7 @@ package org.apache.camel.saga; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -38,7 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A in-memory implementation of a saga coordinator. + * An in-memory implementation of a saga coordinator. */ public class InMemorySagaCoordinator implements CamelSagaCoordinator { @@ -86,15 +87,20 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator { if (!step.getOptions().isEmpty()) { optionValues.putIfAbsent(step, new ConcurrentHashMap<>()); - Map<String, Object> values = optionValues.get(step); + Map<String, Object> values = optionValues.computeIfAbsent(step, k -> new HashMap<>()); for (String option : step.getOptions().keySet()) { Expression expression = step.getOptions().get(option); - try { - values.put(option, expression.evaluate(exchange, Object.class)); - } catch (Exception ex) { - return CompletableFuture.supplyAsync(() -> { - throw new RuntimeCamelException("Cannot evaluate saga option '" + option + "'", ex); - }); + if (expression != null) { + try { + Object value = expression.evaluate(exchange, Object.class); + if (value != null) { + values.put(option, value); + } + } catch (Exception ex) { + return CompletableFuture.supplyAsync(() -> { + throw new RuntimeCamelException("Cannot evaluate saga option '" + option + "'", ex); + }); + } } } } @@ -103,7 +109,7 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator { sagaService.getExecutorService().schedule(() -> { boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPENSATING); if (doAction) { - doCompensate(); + doCompensate(exchange); } }, step.getTimeoutInMilliseconds().get(), TimeUnit.MILLISECONDS); } @@ -116,7 +122,7 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator { boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPENSATING); if (doAction) { - doCompensate(); + doCompensate(exchange); } else { Status status = currentStatus.get(); if (status != Status.COMPENSATING && status != Status.COMPENSATED) { @@ -134,7 +140,7 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator { boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPLETING); if (doAction) { - doComplete(); + doComplete(exchange); } else { Status status = currentStatus.get(); if (status != Status.COMPLETING && status != Status.COMPLETED) { @@ -147,30 +153,30 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator { return CompletableFuture.completedFuture(null); } - public CompletableFuture<Boolean> doCompensate() { - return doFinalize(CamelSagaStep::getCompensation, "compensation") + public CompletableFuture<Boolean> doCompensate(final Exchange exchange) { + return doFinalize(exchange, CamelSagaStep::getCompensation, "compensation") .thenApply(res -> { currentStatus.set(Status.COMPENSATED); return res; }); } - public CompletableFuture<Boolean> doComplete() { - return doFinalize(CamelSagaStep::getCompletion, "completion") + public CompletableFuture<Boolean> doComplete(final Exchange exchange) { + return doFinalize(exchange, CamelSagaStep::getCompletion, "completion") .thenApply(res -> { currentStatus.set(Status.COMPLETED); return res; }); } - public CompletableFuture<Boolean> doFinalize( + public CompletableFuture<Boolean> doFinalize(final Exchange exchange, Function<CamelSagaStep, Optional<Endpoint>> endpointExtractor, String description) { CompletableFuture<Boolean> result = CompletableFuture.completedFuture(true); for (CamelSagaStep step : reversed(steps)) { Optional<Endpoint> endpoint = endpointExtractor.apply(step); if (endpoint.isPresent()) { result = result.thenCompose( - prevResult -> doFinalize(endpoint.get(), step, 0, description).thenApply(res -> prevResult && res)); + prevResult -> doFinalize(exchange, endpoint.get(), step, 0, description).thenApply(res -> prevResult && res)); } } return result.whenComplete((done, ex) -> { @@ -182,8 +188,8 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator { }); } - private CompletableFuture<Boolean> doFinalize(Endpoint endpoint, CamelSagaStep step, int doneAttempts, String description) { - Exchange exchange = createExchange(endpoint, step); + private CompletableFuture<Boolean> doFinalize(Exchange exchange, Endpoint endpoint, CamelSagaStep step, int doneAttempts, String description) { + populateExchange(exchange, step); return CompletableFuture.supplyAsync(() -> { Exchange res = camelContext.createFluentProducerTemplate().to(endpoint).withExchange(exchange).send(); @@ -205,7 +211,7 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator { } else { CompletableFuture<Boolean> future = new CompletableFuture<>(); sagaService.getExecutorService().schedule(() -> { - doFinalize(endpoint, step, currentAttempt, description).whenComplete((res, ex) -> { + doFinalize(exchange, endpoint, step, currentAttempt, description).whenComplete((res, ex) -> { if (ex != null) { future.completeExceptionally(ex); } else { @@ -218,17 +224,15 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator { }); } - private Exchange createExchange(Endpoint endpoint, CamelSagaStep step) { - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, getId()); + private void populateExchange(Exchange exchange, CamelSagaStep step) { + exchange.getMessage().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, getId()); Map<String, Object> values = optionValues.get(step); if (values != null) { for (Map.Entry<String, Object> entry : values.entrySet()) { - exchange.getIn().setHeader(entry.getKey(), entry.getValue()); + exchange.getMessage().setHeader(entry.getKey(), entry.getValue()); } } - return exchange; } private <T> List<T> reversed(List<T> list) {