This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 7024bdb4d2e CAMEL-22259: camel-core - Splitter/Multicast with 
shareUnitOfWork sho… (#18684)
7024bdb4d2e is described below

commit 7024bdb4d2e51d66e21d5536929b21727357644e
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Jul 23 10:44:16 2025 +0200

    CAMEL-22259: camel-core - Splitter/Multicast with shareUnitOfWork sho… 
(#18684)
    
    * CAMEL-22259: camel-core - Splitter/Multicast with shareUnitOfWork should 
use single uow
    * CAMEL-22263: camel-core: Deprecate ARENT_UNIT_OF_WORK
---
 .../camel/component/undertow/ExchangeHeaders.java  |   1 +
 .../src/main/java/org/apache/camel/Exchange.java   |   1 +
 .../apache/camel/spi/InternalProcessorFactory.java |   3 -
 .../camel/impl/engine/CamelInternalProcessor.java  |  20 ----
 .../processor/DefaultInternalProcessorFactory.java |   8 --
 .../java/org/apache/camel/processor/Enricher.java  |   1 -
 .../apache/camel/processor/MulticastProcessor.java |  18 +---
 ...xOriginalMessageBodyAndEnrichedHeadersTest.java |   5 +-
 .../processor/SplitterShareUnitOfWorkTest.java     | 104 +++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_14.adoc    |  11 +++
 10 files changed, 124 insertions(+), 48 deletions(-)

diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/ExchangeHeaders.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/ExchangeHeaders.java
index cf62ba5b4da..857744234e3 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/ExchangeHeaders.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/ExchangeHeaders.java
@@ -118,6 +118,7 @@ public final class ExchangeHeaders {
     public static final HttpString ON_COMPLETION = new 
HttpString("CamelOnCompletion");
     public static final HttpString OVERRULE_FILE_NAME = new 
HttpString("CamelOverruleFileName");
 
+    @Deprecated
     public static final HttpString PARENT_UNIT_OF_WORK = new 
HttpString("CamelParentUnitOfWork");
 
     public static final HttpString RECIPIENT_LIST_ENDPOINT = new 
HttpString("CamelRecipientListEndpoint");
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java 
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 02b659cb40e..7ac40e9981b 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -255,6 +255,7 @@ public interface Exchange extends VariableAware {
     String OFFSET = "CamelOffset";
     String OVERRULE_FILE_NAME = "CamelOverruleFileName";
 
+    @Deprecated(since = "4.14.0")
     String PARENT_UNIT_OF_WORK = "CamelParentUnitOfWork";
     String STREAM_CACHE_UNIT_OF_WORK = "CamelStreamCacheUnitOfWork";
 
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
index 0d2424ddd0f..e9486263fee 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -43,9 +43,6 @@ public interface InternalProcessorFactory {
 
     InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, 
Processor processor, Route route);
 
-    InternalProcessor addChildUnitOfWorkProcessorAdvice(
-            CamelContext camelContext, Processor processor, Route route, 
UnitOfWork parent);
-
     SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext 
camelContext);
 
     Channel createChannel(CamelContext camelContext);
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 37f0f8b536a..b4764f615d5 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -892,26 +892,6 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
 
     }
 
-    /**
-     * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality.
-     */
-    public static class ChildUnitOfWorkProcessorAdvice extends 
UnitOfWorkProcessorAdvice {
-
-        private final UnitOfWork parent;
-
-        public ChildUnitOfWorkProcessorAdvice(Route route, CamelContext 
camelContext, UnitOfWork parent) {
-            super(route, camelContext);
-            this.parent = parent;
-        }
-
-        @Override
-        protected UnitOfWork createUnitOfWork(Exchange exchange) {
-            // let the parent create a child unit of work to be used
-            return parent.createChildUnitOfWork(exchange);
-        }
-
-    }
-
     /**
      * Advice when Message History has been enabled.
      */
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
index 74118ab510a..6db4a56608a 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
@@ -32,7 +32,6 @@ import org.apache.camel.spi.InterceptSendToEndpoint;
 import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.SharedInternalProcessor;
-import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.spi.annotations.JdkService;
 
 @JdkService(InternalProcessorFactory.FACTORY)
@@ -44,13 +43,6 @@ public class DefaultInternalProcessorFactory implements 
InternalProcessorFactory
         return internal;
     }
 
-    public InternalProcessor addChildUnitOfWorkProcessorAdvice(
-            CamelContext camelContext, Processor processor, Route route, 
UnitOfWork parent) {
-        CamelInternalProcessor internal = new 
CamelInternalProcessor(camelContext, processor);
-        internal.addAdvice(new 
CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(route, camelContext, 
parent));
-        return internal;
-    }
-
     public SharedInternalProcessor 
createSharedCamelInternalProcessor(CamelContext camelContext) {
         return new SharedCamelInternalProcessor(
                 camelContext, new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext));
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index 9424fe89a61..2a6709c6624 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -265,7 +265,6 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
 
         // if we share unit of work, we need to prepare the resource exchange
         if (isShareUnitOfWork()) {
-            target.setProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, 
source.getUnitOfWork());
             // and then share the unit of work
             
target.getExchangeExtension().setUnitOfWork(source.getUnitOfWork());
         }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index dbaf06d26f3..5fd4328f365 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -302,7 +302,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 wrapInErrorHandler(route, exchange, processor);
             }
         }
-
         ServiceHelper.initService(processorExchangeFactory);
     }
 
@@ -1086,13 +1085,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 // and wrap in unit of work processor so the copy exchange 
also can run under UoW
                 answer = createUnitOfWorkProcessor(route, processor, exchange);
 
-                boolean child = 
exchange.getProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, UnitOfWork.class) 
!= null;
-
                 // must start the error handler
                 ServiceHelper.startService(answer);
 
-                // here we don't cache the child unit of work
-                if (!child && errorHandlers != null) {
+                if (errorHandlers != null) {
                     errorHandlers.putIfAbsent(key, answer);
                 }
 
@@ -1125,25 +1121,21 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
      */
     protected Processor createUnitOfWorkProcessor(Route route, Processor 
processor, Exchange exchange) {
         // and wrap it in a unit of work so the UoW is on the top, so the 
entire route will be in the same UoW
-        UnitOfWork parent = 
exchange.getProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, UnitOfWork.class);
-        if (parent != null) {
-            return 
internalProcessorFactory.addChildUnitOfWorkProcessorAdvice(camelContext, 
processor, route, parent);
-        } else {
-            return 
internalProcessorFactory.addUnitOfWorkProcessorAdvice(camelContext, processor, 
route);
-        }
+        return 
internalProcessorFactory.addUnitOfWorkProcessorAdvice(camelContext, processor, 
route);
     }
 
     /**
      * Prepares the exchange for participating in a shared unit of work
      * <p/>
-     * This ensures a child exchange can access its parent {@link UnitOfWork} 
when it participate in a shared unit of
+     * This ensures a child exchange can access its parent {@link UnitOfWork} 
when it participates in a shared unit of
      * work.
      *
      * @param childExchange  the child exchange
      * @param parentExchange the parent exchange
      */
     protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange 
parentExchange) {
-        childExchange.setProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, 
parentExchange.getUnitOfWork());
+        // share the unit of work on the child
+        
childExchange.getExchangeExtension().setUnitOfWork(parentExchange.getUnitOfWork());
     }
 
     @Override
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/MulticastMixOriginalMessageBodyAndEnrichedHeadersTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/MulticastMixOriginalMessageBodyAndEnrichedHeadersTest.java
index 8bc833dee35..8fc2c32a056 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/issues/MulticastMixOriginalMessageBodyAndEnrichedHeadersTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/MulticastMixOriginalMessageBodyAndEnrichedHeadersTest.java
@@ -36,9 +36,8 @@ public class 
MulticastMixOriginalMessageBodyAndEnrichedHeadersTest extends Conte
 
                 onException(Exception.class).handled(true)
                         // we want to preserve the real original message body 
and
-                        // then include other headers that have been
-                        // set later during routing
-                        
.transform(simple("${exchangeProperty[CamelParentUnitOfWork].getOriginalInMessage().getBody()}"))
+                        // then include other headers that have been set later 
during routing
+                        .transform(simple("${originalBody}"))
                         .to("mock:b");
 
                 from("direct:start").setBody(constant("Changed 
body")).setHeader("foo", constant("bar")).multicast()
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java
new file mode 100644
index 00000000000..3cd672bc59d
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SplitterShareUnitOfWorkTest extends ContextTestSupport {
+
+    private final List<UnitOfWork> uows = new ArrayList<>();
+    private final List<UnitOfWork> doneUows = new ArrayList<>();
+    private final List<String> doneBodies = new ArrayList<>();
+
+    @Test
+    public void testShareUnitOfWork() throws Exception {
+        getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C");
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+B+C");
+
+        template.sendBody("direct:start", "A,B,C");
+
+        assertMockEndpointsSatisfied();
+
+        Assertions.assertEquals(3, uows.size());
+
+        // all in-flight uows should be the same
+        Assertions.assertSame(uows.get(0), uows.get(1));
+        Assertions.assertSame(uows.get(1), uows.get(2));
+        Assertions.assertSame(uows.get(2), uows.get(0));
+
+        // and done uow should be the same
+        Assertions.assertSame(uows.get(0), doneUows.get(0));
+        Assertions.assertSame(uows.get(1), doneUows.get(1));
+        Assertions.assertSame(uows.get(2), doneUows.get(2));
+
+        // uow is done after the entire route so the exchange body is the 
output from the aggregation strategy
+        Assertions.assertEquals("A+B+C", doneBodies.get(0));
+        Assertions.assertEquals("A+B+C", doneBodies.get(1));
+        Assertions.assertEquals("A+B+C", doneBodies.get(2));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").split(body(), new 
MyStrategy()).shareUnitOfWork()
+                        .process(e -> {
+                            var u = e.getUnitOfWork();
+                            uows.add(u);
+                            u.addSynchronization(new SynchronizationAdapter() {
+                                @Override
+                                public void onDone(Exchange exchange) {
+                                    var b = 
exchange.getMessage().getBody(String.class);
+                                    doneBodies.add(b);
+                                    var u = exchange.getUnitOfWork();
+                                    doneUows.add(u);
+
+                                    // should only be invoked after all is 
complete (3 line and 1 result)
+                                    Assertions.assertEquals(3, 
getMockEndpoint("mock:line").getReceivedCounter());
+                                    Assertions.assertEquals(1, 
getMockEndpoint("mock:result").getReceivedCounter());
+                                }
+                            });
+                        })
+                        .to("mock:line").end().to("mock:result");
+            }
+        };
+    }
+
+    private static class MyStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body = oldExchange.getIn().getBody() + "+" + 
newExchange.getIn().getBody();
+            oldExchange.getIn().setBody(body);
+            return oldExchange;
+        }
+    }
+
+}
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
index 78759b83a6a..c015d5e96dc 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
@@ -8,6 +8,17 @@ from both 4.0 to 4.1 and 4.1 to 4.2.
 
 === camel-core
 
+==== Splitter and Multicast EIPs
+
+When using `shareUnitOfWork=true` in Split or Multicast EIPs, then Camel will 
now use a single shared `UnitOfWork` instance (parent)
+for the entire body of work. So if the Splitter is splitting into 1000 sub 
messages, then each of them will now reuse
+the same `UnitOfWork` and any completion tasks that each sub messages, will 
now be executed later, when the parent `UnitOfWork`
+is complete, usually when the original message is completed.
+
+Previously, each sub-message was independent (despite the documentation refers 
to this not being the case). However this feature
+has been mistakenly for many years, as this feature is rarely in use. However, 
we had the opportunity to look into this as part
+of an issue, and felt it's better to fix this before for this LTS release.
+
 === camel-jbang
 
 The `camel export` will not include `camel-observabilities-services` out of 
the box. To include this, then use `--observe` to enable

Reply via email to