This is an automated email from the ASF dual-hosted git repository. zhfeng pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push: new c18f49b9589 CAMEL-18377: camel-jpa - resue an EntityManager from the current tran… (#8141) c18f49b9589 is described below commit c18f49b9589a451ce7013c9a5bc2cbac5a8a9b61 Author: Zheng Feng <zh.f...@gmail.com> AuthorDate: Tue Aug 16 14:03:49 2022 +0800 CAMEL-18377: camel-jpa - resue an EntityManager from the current tran… (#8141) * CAMEL-18377: camel-jpa - reuse an EntityManager in the current transaction if possible * Fix to create txData only if the exchange is transacted * Add a unit test with pooling dataSource for transactd() combined with split() * Fix in multicast * Fix in RecipientList * Add a Enrich test * Fix CS --- components/camel-jpa/pom.xml | 5 ++ .../org/apache/camel/component/jpa/JpaHelper.java | 39 +++++++++ .../camel/processor/jpa/JpaTransactedTest.java | 94 ++++++++++++++++++++++ .../src/test/resources/META-INF/persistence.xml | 17 ++++ .../processor/jpa/springJpaRoutePoolingTest.xml | 36 +++++++++ .../org/apache/camel/ExchangeConstantProvider.java | 3 +- .../src/main/java/org/apache/camel/Exchange.java | 1 + .../apache/camel/processor/MulticastProcessor.java | 10 ++- .../camel/processor/RecipientListProcessor.java | 12 +++ .../java/org/apache/camel/processor/Splitter.java | 12 +++ 10 files changed, 227 insertions(+), 2 deletions(-) diff --git a/components/camel-jpa/pom.xml b/components/camel-jpa/pom.xml index da76413ad8c..66fd78568f5 100644 --- a/components/camel-jpa/pom.xml +++ b/components/camel-jpa/pom.xml @@ -82,6 +82,11 @@ <artifactId>spring-jdbc</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-dbcp2</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java index 463c888b1ce..0034aa67559 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java @@ -60,6 +60,14 @@ public final class JpaHelper { em = getEntityManagerMap(exchange).get(getKey(entityManagerFactory)); } + // then try reuse any entity manager from the transaction context + if (em == null && exchange != null && exchange.isTransacted()) { + Map<String, Object> data = getTransactionContextData(exchange); + if (data != null) { + em = (EntityManager) data.get(getKey(entityManagerFactory)); + } + } + if (em == null && useSharedEntityManager) { em = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); } @@ -75,6 +83,21 @@ public final class JpaHelper { return em; } + /** + * Copy JpaConstants.ENTITY_MANAGER property from source to target exchange. + * + * @param target The target exchange + * @param source The source exchange + */ + public static void copyEntityManagers(Exchange target, Exchange source) { + if (target != null && source != null && target.getProperty(JpaConstants.ENTITY_MANAGER) == null) { + Map<String, EntityManager> entityManagers = source.getProperty(JpaConstants.ENTITY_MANAGER, Map.class); + if (entityManagers != null) { + target.setProperty(JpaConstants.ENTITY_MANAGER, entityManagers); + } + } + } + private static EntityManager createEntityManager(Exchange exchange, EntityManagerFactory entityManagerFactory) { EntityManager em; em = entityManagerFactory.createEntityManager(); @@ -82,11 +105,27 @@ public final class JpaHelper { // we want to reuse the EM so store as property and make sure we close it when done with the exchange Map<String, EntityManager> entityManagers = getEntityManagerMap(exchange); entityManagers.put(getKey(entityManagerFactory), em); + + // we want to reuse the EM in the same transaction + if (exchange.isTransacted()) { + Map<String, Object> data = getTransactionContextData(exchange); + if (data != null) { + data.put(getKey(entityManagerFactory), em); + } + } exchange.adapt(ExtendedExchange.class).addOnCompletion(new JpaCloseEntityManagerOnCompletion(em)); } return em; } + private static Map<String, Object> getTransactionContextData(Exchange exchange) { + Map<String, Object> data = null; + if (exchange.isTransacted()) { + data = exchange.getProperty(Exchange.TRANSACTION_CONTEXT_DATA, Map.class); + } + return data; + } + @SuppressWarnings("unchecked") private static Map<String, EntityManager> getEntityManagerMap(Exchange exchange) { Map<String, EntityManager> entityManagers = exchange.getProperty(JpaConstants.ENTITY_MANAGER, Map.class); diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java new file mode 100644 index 00000000000..61f64e86b4b --- /dev/null +++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTransactedTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.jpa; + +import java.util.Arrays; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jpa.JpaHelper; +import org.apache.camel.examples.SendEmail; +import org.junit.jupiter.api.Test; + +public class JpaTransactedTest extends AbstractJpaTest { + protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x"; + + @Test + public void testTransactedSplit() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(2); + template.sendBody("direct:split", Arrays.asList( + new SendEmail("te...@example.org"), new SendEmail("te...@example.org"))); + assertMockEndpointsSatisfied(); + } + + @Test + public void testTransactedMulticast() throws Exception { + template.sendBody("direct:multicast", new SendEmail("t...@example.org")); + } + + @Test + public void testTransactedRecipientList() throws Exception { + template.sendBody("direct:recipient", new SendEmail("t...@example.org")); + } + + @Test + public void testTransactedEnrich() throws Exception { + template.sendBody("direct:enrich", new SendEmail("t...@example.org")); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:split") + .transacted().split().body() + .to("jpa://" + SendEmail.class.getName()) + .to("mock:result"); + + from("direct:multicast") + .transacted().multicast() + .to("jpa://" + SendEmail.class.getName(), "jpa://" + SendEmail.class.getName()); + + from("direct:recipient") + .transacted().recipientList( + constant("jpa://" + SendEmail.class.getName() + "," + "jpa://" + SendEmail.class.getName())); + + from("direct:enrich") + .transacted().enrich("jpa://" + SendEmail.class.getName(), new AggregationStrategy() { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + JpaHelper.copyEntityManagers(oldExchange, newExchange); + return oldExchange; + } + }) + .to("jpa://" + SendEmail.class.getName()); + } + }; + } + + @Override + protected String routeXml() { + return "org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml"; + } + + @Override + protected String selectAllString() { + return SELECT_ALL_STRING; + } +} diff --git a/components/camel-jpa/src/test/resources/META-INF/persistence.xml b/components/camel-jpa/src/test/resources/META-INF/persistence.xml index f0c146b1d63..76f6cd393a1 100644 --- a/components/camel-jpa/src/test/resources/META-INF/persistence.xml +++ b/components/camel-jpa/src/test/resources/META-INF/persistence.xml @@ -63,6 +63,23 @@ </properties> </persistence-unit> + <persistence-unit name="pooling" transaction-type="RESOURCE_LOCAL"> + <class>org.apache.camel.examples.SendEmail</class> + + <properties> + <property name="openjpa.ConnectionProperties" + value="DriverClassName=org.apache.derby.jdbc.EmbeddedDriver, + Url=jdbc:derby:target/custom;create=true, + MaxTotal=1, + MaxWaitMillis=1000, + TestOnBorrow=true"/> + <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp2.BasicDataSource"/> + <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/> + <property name="openjpa.Log" value="DefaultLevel=WARN, Tool=INFO"/> + <property name="openjpa.Multithreaded" value="true"/> + </properties> + </persistence-unit> + <!-- START SNIPPET: e1 --> <persistence-unit name="idempotentDb" transaction-type="RESOURCE_LOCAL"> <class>org.apache.camel.processor.idempotent.jpa.MessageProcessed</class> diff --git a/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml new file mode 100644 index 00000000000..ce4232d3b2f --- /dev/null +++ b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRoutePoolingTest.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + + <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean"> + <property name="persistenceUnitName" value="pooling"/> + </bean> + + <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager"> + <property name="entityManagerFactory" ref="entityManagerFactory"/> + </bean> + + <bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate"> + <property name="transactionManager" ref="transactionManager"/> + </bean> + +</beans> diff --git a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java index d495cac7f9a..4fd9528ca69 100644 --- a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java +++ b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java @@ -11,7 +11,7 @@ public class ExchangeConstantProvider { private static final Map<String, String> MAP; static { - Map<String, String> map = new HashMap<>(154); + Map<String, String> map = new HashMap<>(155); map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType"); map.put("AGGREGATED_COLLECTION_GUARD", "CamelAggregatedCollectionGuard"); map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy"); @@ -159,6 +159,7 @@ public class ExchangeConstantProvider { map.put("TRACE_EVENT_TIMESTAMP", "CamelTraceEventTimestamp"); map.put("TRACING_HEADER_FORMAT", "CamelTracingHeaderFormat"); map.put("TRACING_OUTPUT_FORMAT", "CamelTracingOutputFormat"); + map.put("TRANSACTION_CONTEXT_DATA", "CamelTransactionContextData"); map.put("TRANSFER_ENCODING", "Transfer-Encoding"); map.put("TRY_ROUTE_BLOCK", "TryRouteBlock"); map.put("UNIT_OF_WORK_EXHAUSTED", "CamelUnitOfWorkExhausted"); diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 3ba0e4b5edb..7e248c0e578 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -254,6 +254,7 @@ public interface Exchange { String TRACING_HEADER_FORMAT = "CamelTracingHeaderFormat"; @Deprecated String TRACING_OUTPUT_FORMAT = "CamelTracingOutputFormat"; + String TRANSACTION_CONTEXT_DATA = "CamelTransactionContextData"; String TRY_ROUTE_BLOCK = "TryRouteBlock"; String TRANSFER_ENCODING = "Transfer-Encoding"; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index fb231ab762c..2cc448af400 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -899,6 +899,7 @@ public class MulticastProcessor extends AsyncProcessorSupport protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { List<ProcessorExchangePair> result = new ArrayList<>(processors.size()); + Map<String, Object> txData = null; StreamCache streamCache = null; if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) { @@ -911,7 +912,14 @@ public class MulticastProcessor extends AsyncProcessorSupport // copy exchange, and do not share the unit of work Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted()); - + // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData + // during the transaction. + if (exchange.isTransacted() && copy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) { + if (txData == null) { + txData = new ConcurrentHashMap<>(); + } + copy.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData); + } if (streamCache != null) { if (index > 0) { // copy it otherwise parallel processing is not possible, diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index 7dbd3741e87..8c1a735a86d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import org.apache.camel.AggregationStrategy; @@ -69,6 +71,7 @@ public class RecipientListProcessor extends MulticastProcessor { private final String delimiter; private final ProducerCache producerCache; private int cacheSize; + private Map<String, Object> txData; /** * Class that represent each step in the recipient list to do @@ -290,6 +293,15 @@ public class RecipientListProcessor extends MulticastProcessor { Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted()); + // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData + // during the transaction. + if (exchange.isTransacted() && copy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) { + if (txData == null) { + txData = new ConcurrentHashMap<>(); + } + copy.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData); + } + // if we share unit of work, we need to prepare the child exchange if (isShareUnitOfWork()) { prepareSharedUnitOfWork(copy, exchange); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java index 4b9b7adf2e1..597bc007391 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java @@ -23,6 +23,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import org.apache.camel.AggregationStrategy; @@ -203,6 +205,8 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac private int index; private boolean closed; + private Map<String, Object> txData; + public boolean hasNext() { if (closed) { return false; @@ -229,6 +233,14 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac // and do not share the unit of work Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false); newExchange.adapt(ExtendedExchange.class).setTransacted(original.isTransacted()); + // If we are in a transaction, set TRANSACTION_CONTEXT_DATA property for new exchanges to share txData + // during the transaction. + if (original.isTransacted() && newExchange.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) { + if (txData == null) { + txData = new ConcurrentHashMap<>(); + } + newExchange.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, txData); + } // If the splitter has an aggregation strategy // then the StreamCache created by the child routes must not be // closed by the unit of work of the child route, but by the unit of