This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.14.x by this push:
     new 20764a2811c1 CAMEL-22534: JPA works with parallel splitter by default 
(#19538)
20764a2811c1 is described below

commit 20764a2811c11ee587fb23fbced9369cf0cd4d82
Author: JiriOndrusek <[email protected]>
AuthorDate: Tue Oct 14 07:38:17 2025 +0200

    CAMEL-22534: JPA works with parallel splitter by default (#19538)
---
 .../apache/camel/component/jpa/JpaConstants.java   |  3 +-
 .../processor/jpa/JpaParallelSplitterTest.java     | 88 ++++++++++++++++++++++
 .../org/apache/camel/ExchangeConstantProvider.java |  3 +-
 .../src/main/java/org/apache/camel/Exchange.java   |  2 +
 .../java/org/apache/camel/processor/Splitter.java  |  6 ++
 5 files changed, 100 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
index ad0c761777e9..4273b1394f46 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jpa;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.spi.Metadata;
 
 /**
@@ -24,7 +25,7 @@ import org.apache.camel.spi.Metadata;
 public final class JpaConstants {
 
     @Metadata(description = "The JPA `EntityManager` object.", javaType = 
"jakarta.persistence.EntityManager")
-    public static final String ENTITY_MANAGER = "CamelEntityManager";
+    public static final String ENTITY_MANAGER = Exchange.JPA_ENTITY_MANAGER;
     @Metadata(label = "producer", description = "Alternative way for passing 
query parameters as an Exchange header.",
               javaType = "Map<String, Object>")
     public static final String JPA_PARAMETERS_HEADER = "CamelJpaParameters";
diff --git 
a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaParallelSplitterTest.java
 
b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaParallelSplitterTest.java
new file mode 100644
index 000000000000..ca6ac47119ca
--- /dev/null
+++ 
b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaParallelSplitterTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.examples.SendEmail;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JpaParallelSplitterTest extends AbstractJpaTest {
+    protected static final String SELECT_ALL_STRING = "select x from " + 
SendEmail.class.getName() + " x";
+
+    @Test
+    public void testParallelSplitter() throws Exception {
+        Future<Object> future = template.asyncRequestBody("direct:splitter", 
"[email protected]");
+
+        future.get(10, TimeUnit.SECONDS);
+
+        assertCorrectEmails();
+    }
+
+    private void assertCorrectEmails() {
+        List<?> results = entityManager
+                .createQuery("select e from " + SendEmail.class.getName() + " 
e WHERE e.address = '[email protected]'")
+                .getResultList();
+        assertEquals(50, results.size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+
+                from("direct:splitter?timeout=1000")
+                        .loop(50)
+                        .setHeader("loopIndex", 
exchangeProperty(Exchange.LOOP_INDEX))
+                        .process(ex -> {
+                            int index = ex.getIn().getHeader("loopIndex", 
Integer.class);
+                            SendEmail se = new SendEmail(index + "@wrong.org");
+                            ex.getIn().setBody(se);
+                        })
+                        .to("jpa://" + SendEmail.class.getName())
+                        .end()
+                        //select all
+                        .to("jpa://" + SendEmail.class.getName() + 
"?query=SELECT e FROM SendEmail e")
+                        .split(body())
+                        .parallelProcessing()
+                        .threads(10, 10)
+                        .process(ex -> {
+                            SendEmail se = ex.getIn().getBody(SendEmail.class);
+                            se.setAddress("[email protected]");
+                        })
+                        .to("jpa://" + SendEmail.class.getName());
+
+            }
+        };
+    }
+
+    @Override
+    protected String routeXml() {
+        return "org/apache/camel/processor/jpa/springJpaRouteTest.xml";
+    }
+
+    @Override
+    protected String selectAllString() {
+        return SELECT_ALL_STRING;
+    }
+}
diff --git 
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
 
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
index a9e14a38e2bd..2af6074c9445 100644
--- 
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
+++ 
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
@@ -13,7 +13,7 @@ public class ExchangeConstantProvider {
 
     private static final Map<String, String> MAP;
     static {
-        Map<String, String> map = new HashMap<>(163);
+        Map<String, String> map = new HashMap<>(164);
         map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType");
         map.put("AGGREGATED_COLLECTION_GUARD", 
"CamelAggregatedCollectionGuard");
         map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy");
@@ -104,6 +104,7 @@ public class ExchangeConstantProvider {
         map.put("INTERCEPTED_ROUTE_ENDPOINT_URI", 
"CamelInterceptedParentEndpointUri");
         map.put("INTERCEPTED_ROUTE_ID", "CamelInterceptedRouteId");
         map.put("INTERRUPTED", "CamelInterrupted");
+        map.put("JPA_ENTITY_MANAGER", "CamelEntityManager");
         map.put("LANGUAGE_SCRIPT", "CamelLanguageScript");
         map.put("LOG_DEBUG_BODY_MAX_CHARS", "CamelLogDebugBodyMaxChars");
         map.put("LOG_DEBUG_BODY_STREAMS", "CamelLogDebugStreams");
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java 
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index abc96cfc2a09..767b17a50124 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -218,6 +218,8 @@ public interface Exchange extends VariableAware {
     @Deprecated(since = "3.1.0")
     String INTERRUPTED = "CamelInterrupted";
 
+    String JPA_ENTITY_MANAGER = "CamelEntityManager";
+
     String LANGUAGE_SCRIPT = "CamelLanguageScript";
     String LOG_DEBUG_BODY_MAX_CHARS = "CamelLogDebugBodyMaxChars";
     String LOG_DEBUG_BODY_STREAMS = "CamelLogDebugStreams";
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index c411183f9d4d..04bec6aed8a0 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -320,6 +320,12 @@ public class Splitter extends MulticastProcessor 
implements AsyncProcessor, Trac
             // we do not want to copy the message history for split 
sub-messages
             answer.removeProperty(ExchangePropertyKey.MESSAGE_HISTORY);
         }
+
+        if (isParallelProcessing()) {
+            //we do not want to copy JPA entityManager (which is not meant for 
concurrent use) in parallel mode
+            //jpa component takes care of the entityManager if property is 
removed
+            answer.removeProperty(Exchange.JPA_ENTITY_MANAGER);
+        }
         return answer;
     }
 }

Reply via email to