This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 7024bdb4d2e CAMEL-22259: camel-core - Splitter/Multicast with shareUnitOfWork sho… (#18684) 7024bdb4d2e is described below commit 7024bdb4d2e51d66e21d5536929b21727357644e Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jul 23 10:44:16 2025 +0200 CAMEL-22259: camel-core - Splitter/Multicast with shareUnitOfWork sho… (#18684) * CAMEL-22259: camel-core - Splitter/Multicast with shareUnitOfWork should use single uow * CAMEL-22263: camel-core: Deprecate ARENT_UNIT_OF_WORK --- .../camel/component/undertow/ExchangeHeaders.java | 1 + .../src/main/java/org/apache/camel/Exchange.java | 1 + .../apache/camel/spi/InternalProcessorFactory.java | 3 - .../camel/impl/engine/CamelInternalProcessor.java | 20 ---- .../processor/DefaultInternalProcessorFactory.java | 8 -- .../java/org/apache/camel/processor/Enricher.java | 1 - .../apache/camel/processor/MulticastProcessor.java | 18 +--- ...xOriginalMessageBodyAndEnrichedHeadersTest.java | 5 +- .../processor/SplitterShareUnitOfWorkTest.java | 104 +++++++++++++++++++++ .../ROOT/pages/camel-4x-upgrade-guide-4_14.adoc | 11 +++ 10 files changed, 124 insertions(+), 48 deletions(-) diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/ExchangeHeaders.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/ExchangeHeaders.java index cf62ba5b4da..857744234e3 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/ExchangeHeaders.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/ExchangeHeaders.java @@ -118,6 +118,7 @@ public final class ExchangeHeaders { public static final HttpString ON_COMPLETION = new HttpString("CamelOnCompletion"); public static final HttpString OVERRULE_FILE_NAME = new HttpString("CamelOverruleFileName"); + @Deprecated public static final HttpString PARENT_UNIT_OF_WORK = new HttpString("CamelParentUnitOfWork"); public static final HttpString RECIPIENT_LIST_ENDPOINT = new HttpString("CamelRecipientListEndpoint"); diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 02b659cb40e..7ac40e9981b 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -255,6 +255,7 @@ public interface Exchange extends VariableAware { String OFFSET = "CamelOffset"; String OVERRULE_FILE_NAME = "CamelOverruleFileName"; + @Deprecated(since = "4.14.0") String PARENT_UNIT_OF_WORK = "CamelParentUnitOfWork"; String STREAM_CACHE_UNIT_OF_WORK = "CamelStreamCacheUnitOfWork"; diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java index 0d2424ddd0f..e9486263fee 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java @@ -43,9 +43,6 @@ public interface InternalProcessorFactory { InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, Processor processor, Route route); - InternalProcessor addChildUnitOfWorkProcessorAdvice( - CamelContext camelContext, Processor processor, Route route, UnitOfWork parent); - SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext); Channel createChannel(CamelContext camelContext); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 37f0f8b536a..b4764f615d5 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -892,26 +892,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } - /** - * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. - */ - public static class ChildUnitOfWorkProcessorAdvice extends UnitOfWorkProcessorAdvice { - - private final UnitOfWork parent; - - public ChildUnitOfWorkProcessorAdvice(Route route, CamelContext camelContext, UnitOfWork parent) { - super(route, camelContext); - this.parent = parent; - } - - @Override - protected UnitOfWork createUnitOfWork(Exchange exchange) { - // let the parent create a child unit of work to be used - return parent.createChildUnitOfWork(exchange); - } - - } - /** * Advice when Message History has been enabled. */ diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java index 74118ab510a..6db4a56608a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java @@ -32,7 +32,6 @@ import org.apache.camel.spi.InterceptSendToEndpoint; import org.apache.camel.spi.InternalProcessor; import org.apache.camel.spi.InternalProcessorFactory; import org.apache.camel.spi.SharedInternalProcessor; -import org.apache.camel.spi.UnitOfWork; import org.apache.camel.spi.annotations.JdkService; @JdkService(InternalProcessorFactory.FACTORY) @@ -44,13 +43,6 @@ public class DefaultInternalProcessorFactory implements InternalProcessorFactory return internal; } - public InternalProcessor addChildUnitOfWorkProcessorAdvice( - CamelContext camelContext, Processor processor, Route route, UnitOfWork parent) { - CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, processor); - internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(route, camelContext, parent)); - return internal; - } - public SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext) { return new SharedCamelInternalProcessor( camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext)); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java index 9424fe89a61..2a6709c6624 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java @@ -265,7 +265,6 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA // if we share unit of work, we need to prepare the resource exchange if (isShareUnitOfWork()) { - target.setProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, source.getUnitOfWork()); // and then share the unit of work target.getExchangeExtension().setUnitOfWork(source.getUnitOfWork()); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index dbaf06d26f3..5fd4328f365 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -302,7 +302,6 @@ public class MulticastProcessor extends AsyncProcessorSupport wrapInErrorHandler(route, exchange, processor); } } - ServiceHelper.initService(processorExchangeFactory); } @@ -1086,13 +1085,10 @@ public class MulticastProcessor extends AsyncProcessorSupport // and wrap in unit of work processor so the copy exchange also can run under UoW answer = createUnitOfWorkProcessor(route, processor, exchange); - boolean child = exchange.getProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null; - // must start the error handler ServiceHelper.startService(answer); - // here we don't cache the child unit of work - if (!child && errorHandlers != null) { + if (errorHandlers != null) { errorHandlers.putIfAbsent(key, answer); } @@ -1125,25 +1121,21 @@ public class MulticastProcessor extends AsyncProcessorSupport */ protected Processor createUnitOfWorkProcessor(Route route, Processor processor, Exchange exchange) { // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW - UnitOfWork parent = exchange.getProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, UnitOfWork.class); - if (parent != null) { - return internalProcessorFactory.addChildUnitOfWorkProcessorAdvice(camelContext, processor, route, parent); - } else { - return internalProcessorFactory.addUnitOfWorkProcessorAdvice(camelContext, processor, route); - } + return internalProcessorFactory.addUnitOfWorkProcessorAdvice(camelContext, processor, route); } /** * Prepares the exchange for participating in a shared unit of work * <p/> - * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate in a shared unit of + * This ensures a child exchange can access its parent {@link UnitOfWork} when it participates in a shared unit of * work. * * @param childExchange the child exchange * @param parentExchange the parent exchange */ protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) { - childExchange.setProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork()); + // share the unit of work on the child + childExchange.getExchangeExtension().setUnitOfWork(parentExchange.getUnitOfWork()); } @Override diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/MulticastMixOriginalMessageBodyAndEnrichedHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/MulticastMixOriginalMessageBodyAndEnrichedHeadersTest.java index 8bc833dee35..8fc2c32a056 100644 --- a/core/camel-core/src/test/java/org/apache/camel/issues/MulticastMixOriginalMessageBodyAndEnrichedHeadersTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/issues/MulticastMixOriginalMessageBodyAndEnrichedHeadersTest.java @@ -36,9 +36,8 @@ public class MulticastMixOriginalMessageBodyAndEnrichedHeadersTest extends Conte onException(Exception.class).handled(true) // we want to preserve the real original message body and - // then include other headers that have been - // set later during routing - .transform(simple("${exchangeProperty[CamelParentUnitOfWork].getOriginalInMessage().getBody()}")) + // then include other headers that have been set later during routing + .transform(simple("${originalBody}")) .to("mock:b"); from("direct:start").setBody(constant("Changed body")).setHeader("foo", constant("bar")).multicast() diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java new file mode 100644 index 00000000000..3cd672bc59d --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.support.SynchronizationAdapter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SplitterShareUnitOfWorkTest extends ContextTestSupport { + + private final List<UnitOfWork> uows = new ArrayList<>(); + private final List<UnitOfWork> doneUows = new ArrayList<>(); + private final List<String> doneBodies = new ArrayList<>(); + + @Test + public void testShareUnitOfWork() throws Exception { + getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C"); + getMockEndpoint("mock:result").expectedBodiesReceived("A+B+C"); + + template.sendBody("direct:start", "A,B,C"); + + assertMockEndpointsSatisfied(); + + Assertions.assertEquals(3, uows.size()); + + // all in-flight uows should be the same + Assertions.assertSame(uows.get(0), uows.get(1)); + Assertions.assertSame(uows.get(1), uows.get(2)); + Assertions.assertSame(uows.get(2), uows.get(0)); + + // and done uow should be the same + Assertions.assertSame(uows.get(0), doneUows.get(0)); + Assertions.assertSame(uows.get(1), doneUows.get(1)); + Assertions.assertSame(uows.get(2), doneUows.get(2)); + + // uow is done after the entire route so the exchange body is the output from the aggregation strategy + Assertions.assertEquals("A+B+C", doneBodies.get(0)); + Assertions.assertEquals("A+B+C", doneBodies.get(1)); + Assertions.assertEquals("A+B+C", doneBodies.get(2)); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start").split(body(), new MyStrategy()).shareUnitOfWork() + .process(e -> { + var u = e.getUnitOfWork(); + uows.add(u); + u.addSynchronization(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + var b = exchange.getMessage().getBody(String.class); + doneBodies.add(b); + var u = exchange.getUnitOfWork(); + doneUows.add(u); + + // should only be invoked after all is complete (3 line and 1 result) + Assertions.assertEquals(3, getMockEndpoint("mock:line").getReceivedCounter()); + Assertions.assertEquals(1, getMockEndpoint("mock:result").getReceivedCounter()); + } + }); + }) + .to("mock:line").end().to("mock:result"); + } + }; + } + + private static class MyStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + String body = oldExchange.getIn().getBody() + "+" + newExchange.getIn().getBody(); + oldExchange.getIn().setBody(body); + return oldExchange; + } + } + +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc index 78759b83a6a..c015d5e96dc 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc @@ -8,6 +8,17 @@ from both 4.0 to 4.1 and 4.1 to 4.2. === camel-core +==== Splitter and Multicast EIPs + +When using `shareUnitOfWork=true` in Split or Multicast EIPs, then Camel will now use a single shared `UnitOfWork` instance (parent) +for the entire body of work. So if the Splitter is splitting into 1000 sub messages, then each of them will now reuse +the same `UnitOfWork` and any completion tasks that each sub messages, will now be executed later, when the parent `UnitOfWork` +is complete, usually when the original message is completed. + +Previously, each sub-message was independent (despite the documentation refers to this not being the case). However this feature +has been mistakenly for many years, as this feature is rarely in use. However, we had the opportunity to look into this as part +of an issue, and felt it's better to fix this before for this LTS release. + === camel-jbang The `camel export` will not include `camel-observabilities-services` out of the box. To include this, then use `--observe` to enable