Updated Branches: refs/heads/master cd82ef8b3 -> 8bcaa4524
CAMEL-6874: Avoid usage of the same EntityManager object across multiple threads as per se EntityManagers are not thread-safe. Also polished the code a bit as well as removed some unused/aimless logic by the JpaProducer#process() method. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bcaa452 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bcaa452 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bcaa452 Branch: refs/heads/master Commit: 8bcaa4524d57ca473593b7dff46c40f8172b2233 Parents: cd82ef8 Author: Babak Vahdat <bvah...@apache.org> Authored: Sat Oct 19 02:10:21 2013 +0200 Committer: Babak Vahdat <bvah...@apache.org> Committed: Sat Oct 19 02:10:21 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/jpa/JpaConsumer.java | 11 ++++- .../apache/camel/component/jpa/JpaEndpoint.java | 13 +----- .../apache/camel/component/jpa/JpaProducer.java | 45 ++++++++++---------- .../component/jpa/AbstractJpaMethodTest.java | 2 +- .../org/apache/camel/component/jpa/JpaTest.java | 2 +- .../camel/component/jpa/JpaUseMergeTest.java | 4 +- .../jpa/JpaWithNamedQueryAndParametersTest.java | 2 +- .../component/jpa/JpaWithNamedQueryTest.java | 9 ++-- 8 files changed, 45 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java index e94c113..91ffd29 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java @@ -44,7 +44,6 @@ import org.springframework.transaction.support.TransactionTemplate; * @version */ public class JpaConsumer extends ScheduledBatchPollingConsumer { - private static final Logger LOG = LoggerFactory.getLogger(JpaConsumer.class); private final JpaEndpoint endpoint; private final EntityManager entityManager; @@ -71,7 +70,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { public JpaConsumer(JpaEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; - this.entityManager = endpoint.getEntityManager(); + this.entityManager = endpoint.createEntityManager(); this.transactionTemplate = endpoint.createTransactionTemplate(); } @@ -441,4 +440,12 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, entityManager); return exchange; } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + entityManager.close(); + LOG.trace("closed the EntityManager {} on {}", entityManager, this); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java index 78c64d2..97d7fdc 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java @@ -43,7 +43,6 @@ public class JpaEndpoint extends ScheduledPollEndpoint { private EntityManagerFactory entityManagerFactory; private PlatformTransactionManager transactionManager; - private EntityManager entityManager; private String persistenceUnit = "camel"; private Expression producerExpression; private int maximumResults = -1; @@ -263,11 +262,8 @@ public class JpaEndpoint extends ScheduledPollEndpoint { return tm; } - protected EntityManager getEntityManager() { - if (entityManager == null) { - entityManager = getEntityManagerFactory().createEntityManager(); - } - return entityManager; + protected EntityManager createEntityManager() { + return getEntityManagerFactory().createEntityManager(); } protected TransactionTemplate createTransactionTemplate() { @@ -302,9 +298,4 @@ public class JpaEndpoint extends ScheduledPollEndpoint { }; } - @Override - protected void doStop() throws Exception { - super.doStop(); -// entityManager.close(); - } } http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java index 8393a9e..cb981ce 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java @@ -16,15 +16,15 @@ */ package org.apache.camel.component.jpa; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import javax.persistence.EntityManager; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.impl.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; @@ -33,19 +33,23 @@ import org.springframework.transaction.support.TransactionTemplate; * @version */ public class JpaProducer extends DefaultProducer { + private static final Logger LOG = LoggerFactory.getLogger(JpaProducer.class); private final EntityManager entityManager; private final TransactionTemplate transactionTemplate; - private final JpaEndpoint endpoint; private final Expression expression; public JpaProducer(JpaEndpoint endpoint, Expression expression) { super(endpoint); - this.endpoint = endpoint; this.expression = expression; - this.entityManager = endpoint.getEntityManager(); + this.entityManager = endpoint.createEntityManager(); this.transactionTemplate = endpoint.createTransactionTemplate(); } + @Override + public JpaEndpoint getEndpoint() { + return (JpaEndpoint) super.getEndpoint(); + } + public void process(final Exchange exchange) { exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, entityManager); final Object values = expression.evaluate(exchange, Object.class); @@ -53,35 +57,24 @@ public class JpaProducer extends DefaultProducer { transactionTemplate.execute(new TransactionCallback<Object>() { public Object doInTransaction(TransactionStatus status) { entityManager.joinTransaction(); - if (values.getClass().isArray()) { Object[] array = (Object[])values; for (int index = 0; index < array.length; index++) { - Object managedEntity = save(array[index], entityManager); - if (!endpoint.isUsePersist()) { - array[index] = managedEntity; - } + save(array[index], entityManager); } } else if (values instanceof Collection) { - @SuppressWarnings("unchecked") - Collection<Object> collection = (Collection<Object>)values; - List<Object> managedEntities = new ArrayList<Object>(); + Collection<?> collection = (Collection<?>)values; for (Object entity : collection) { - Object managedEntity = save(entity, entityManager); - managedEntities.add(managedEntity); - } - if (!endpoint.isUsePersist()) { - collection.clear(); - collection.addAll(managedEntities); + save(entity, entityManager); } } else { Object managedEntity = save(values, entityManager); - if (!endpoint.isUsePersist()) { + if (!getEndpoint().isUsePersist()) { exchange.getIn().setBody(managedEntity); } } - if (endpoint.isFlushOnSend()) { + if (getEndpoint().isFlushOnSend()) { // there may be concurrency so need to join tx before flush entityManager.joinTransaction(); entityManager.flush(); @@ -97,7 +90,7 @@ public class JpaProducer extends DefaultProducer { private Object save(final Object entity, EntityManager entityManager) { // there may be concurrency so need to join tx before persist/merge entityManager.joinTransaction(); - if (endpoint.isUsePersist()) { + if (getEndpoint().isUsePersist()) { entityManager.persist(entity); return entity; } else { @@ -108,4 +101,12 @@ public class JpaProducer extends DefaultProducer { } exchange.getIn().removeHeader(JpaConstants.ENTITYMANAGER); } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + entityManager.close(); + LOG.trace("closed the EntityManager {} on {}", entityManager, this); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java index 6f8db08..bc0aca4 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java @@ -148,7 +148,7 @@ public abstract class AbstractJpaMethodTest extends CamelTestSupport { endpoint = context.getEndpoint(endpointUri, JpaEndpoint.class); transactionTemplate = endpoint.createTransactionTemplate(); - entityManager = endpoint.getEntityManager(); + entityManager = endpoint.createEntityManager(); transactionTemplate.execute(new TransactionCallback<Object>() { public Object doInTransaction(TransactionStatus status) { http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java index 7125526..8977271 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java @@ -118,7 +118,7 @@ public class JpaTest extends Assert { endpoint = (JpaEndpoint) value; transactionTemplate = endpoint.createTransactionTemplate(); - entityManager = endpoint.getEntityManager(); + entityManager = endpoint.createEntityManager(); } protected String getEndpointUri() { http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java index 1a5fe17..8106c5a 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaUseMergeTest.java @@ -49,7 +49,9 @@ public class JpaUseMergeTest extends AbstractJpaMethodTest { assertEntitiesInDatabase(1, Customer.class.getName()); assertEntitiesInDatabase(1, Address.class.getName()); - + + // do detach the persisted entity first before modifying it + entityManager.detach(customer); customer.setName("Max Mustermann"); customer.getAddress().setAddressLine1("Musterstr. 1"); customer.getAddress().setAddressLine2("11111 Enterhausen"); http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java index 8494cc6..667ab99 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java @@ -138,7 +138,7 @@ public class JpaWithNamedQueryAndParametersTest extends Assert { endpoint = (JpaEndpoint)value; transactionTemplate = endpoint.createTransactionTemplate(); - entityManager = endpoint.getEntityManager(); + entityManager = endpoint.createEntityManager(); } protected String getEndpointUri() { http://git-wip-us.apache.org/repos/asf/camel/blob/8bcaa452/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java index 85126e4..d6bae56 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java @@ -111,10 +111,12 @@ public class JpaWithNamedQueryTest extends Assert { transactionTemplate.execute(new TransactionCallback<Object>() { public Object doInTransaction(TransactionStatus status) { - entityManager.joinTransaction(); + // make use of the EntityManager having the relevant persistence-context + EntityManager entityManager2 = receivedExchange.getIn().getHeader(JpaConstants.ENTITYMANAGER, EntityManager.class); + entityManager2.joinTransaction(); // now lets assert that there are still 2 entities left - List<?> rows = entityManager.createQuery("select x from MultiSteps x").getResultList(); + List<?> rows = entityManager2.createQuery("select x from MultiSteps x").getResultList(); assertEquals("Number of entities: " + rows, 2, rows.size()); int counter = 1; @@ -125,7 +127,6 @@ public class JpaWithNamedQueryTest extends Assert { if (row.getAddress().equals("f...@bar.com")) { LOG.info("Found updated row: " + row); - assertEquals("Updated row step for: " + row, getUpdatedStepValue(), row.getStep()); } else { // dummy row @@ -166,7 +167,7 @@ public class JpaWithNamedQueryTest extends Assert { endpoint = (JpaEndpoint)value; transactionTemplate = endpoint.createTransactionTemplate(); - entityManager = endpoint.getEntityManager(); + entityManager = endpoint.createEntityManager(); } protected String getEndpointUri() {