Repository: camel Updated Branches: refs/heads/master 23b975a8f -> 3a98dd9d7
CAMEL-8054: Added option to share enity manager to camel-jpa. Thanks to Chriss Watts for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3a98dd9d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3a98dd9d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3a98dd9d Branch: refs/heads/master Commit: 3a98dd9d79aab778276ca96969599c49be82ec17 Parents: 23b975a Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 14 17:16:21 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Apr 14 17:16:21 2015 +0200 ---------------------------------------------------------------------- .../camel/component/jpa/JpaComponent.java | 14 ++ .../apache/camel/component/jpa/JpaConsumer.java | 16 ++- .../apache/camel/component/jpa/JpaEndpoint.java | 21 ++- .../apache/camel/component/jpa/JpaHelper.java | 8 +- .../apache/camel/component/jpa/JpaProducer.java | 3 +- .../idempotent/jpa/JpaMessageIdRepository.java | 17 ++- .../jpa/JpaRouteSharedEntityManagerTest.java | 130 +++++++++++++++++++ 7 files changed, 200 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java index 73416e2..c2c0227 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java @@ -37,6 +37,7 @@ public class JpaComponent extends UriEndpointComponent { private EntityManagerFactory entityManagerFactory; private PlatformTransactionManager transactionManager; private boolean joinTransaction = true; + private boolean sharedEntityManager; public JpaComponent() { super(JpaEndpoint.class); @@ -80,6 +81,18 @@ public class JpaComponent extends UriEndpointComponent { this.joinTransaction = joinTransaction; } + public boolean isSharedEntityManager() { + return sharedEntityManager; + } + + /** + * Whether to use Spring's SharedEntityManager for the consumer/producer. + * Note in most cases joinTransaction should be set to false as this is not an EXTENDED EntityManager. + */ + public void setSharedEntityManager(boolean sharedEntityManager) { + this.sharedEntityManager = sharedEntityManager; + } + // Implementation methods //------------------------------------------------------------------------- @@ -87,6 +100,7 @@ public class JpaComponent extends UriEndpointComponent { protected Endpoint createEndpoint(String uri, String path, Map<String, Object> options) throws Exception { JpaEndpoint endpoint = new JpaEndpoint(uri, this); endpoint.setJoinTransaction(isJoinTransaction()); + endpoint.setSharedEntityManager(isSharedEntityManager()); // lets interpret the next string as a class if (ObjectHelper.isNotEmpty(path)) { http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java index 9824f7d..2bc82a2 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; + import javax.persistence.Entity; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; @@ -39,6 +40,7 @@ import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.orm.jpa.SharedEntityManagerCreator; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; @@ -500,7 +502,12 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { @Override protected void doStart() throws Exception { super.doStart(); - this.entityManager = entityManagerFactory.createEntityManager(); + + if (getEndpoint().isSharedEntityManager()) { + this.entityManager = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); + } else { + this.entityManager = entityManagerFactory.createEntityManager(); + } LOG.trace("Created EntityManager {} on {}", entityManager, this); } @@ -512,7 +519,10 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { @Override protected void doShutdown() throws Exception { super.doShutdown(); - this.entityManager.close(); - LOG.trace("Closed EntityManager {} on {}", entityManager, this); + + if (entityManager != null) { + this.entityManager.close(); + LOG.trace("Closed EntityManager {} on {}", entityManager, this); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java index 578689f..e725758 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java @@ -38,6 +38,7 @@ import org.apache.camel.util.IntrospectionSupport; import org.apache.camel.util.ObjectHelper; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalEntityManagerFactoryBean; +import org.springframework.orm.jpa.SharedEntityManagerCreator; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.support.TransactionTemplate; @@ -56,6 +57,8 @@ public class JpaEndpoint extends ScheduledPollEndpoint { private String persistenceUnit = "camel"; @UriParam(defaultValue = "true") private boolean joinTransaction = true; + @UriParam + private boolean sharedEntityManager; @UriParam(label = "consumer", defaultValue = "-1") private int maximumResults = -1; @UriParam(label = "consumer", defaultValue = "true") @@ -314,6 +317,18 @@ public class JpaEndpoint extends ScheduledPollEndpoint { this.usePassedInEntityManager = usePassedIn; } + public boolean isSharedEntityManager() { + return sharedEntityManager; + } + + /** + * Whether to use Spring's SharedEntityManager for the consumer/producer. + * Note in most cases joinTransaction should be set to false as this is not an EXTENDED EntityManager. + */ + public void setSharedEntityManager(boolean sharedEntityManager) { + this.sharedEntityManager = sharedEntityManager; + } + // Implementation methods // ------------------------------------------------------------------------- @@ -340,7 +355,11 @@ public class JpaEndpoint extends ScheduledPollEndpoint { */ @Deprecated protected EntityManager createEntityManager() { - return getEntityManagerFactory().createEntityManager(); + if (sharedEntityManager) { + return SharedEntityManagerCreator.createSharedEntityManager(getEntityManagerFactory()); + } else { + return getEntityManagerFactory().createEntityManager(); + } } protected TransactionTemplate createTransactionTemplate() { http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java index 81df51b..44a5913 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java @@ -20,6 +20,7 @@ import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import org.apache.camel.Exchange; +import org.springframework.orm.jpa.SharedEntityManagerCreator; /** * Helper for JPA. @@ -36,10 +37,11 @@ public final class JpaHelper { * @param entityManagerFactory the entity manager factory (mandatory) * @param usePassedInEntityManager whether to use an existing {@link javax.persistence.EntityManager} which has been stored * on the exchange in the header with key {@link org.apache.camel.component.jpa.JpaConstants#ENTITY_MANAGER} + * @param useSharedEntityManager whether to use SharedEntityManagerCreator if not already passed in * @return the entity manager (is never null) */ public static EntityManager getTargetEntityManager(Exchange exchange, EntityManagerFactory entityManagerFactory, - boolean usePassedInEntityManager) { + boolean usePassedInEntityManager, boolean useSharedEntityManager) { EntityManager em = null; // favor using entity manager provided as a header from the end user @@ -52,6 +54,10 @@ public final class JpaHelper { em = exchange.getProperty(JpaConstants.ENTITY_MANAGER, EntityManager.class); } + if (em == null && useSharedEntityManager) { + em = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); + } + if (em == null) { // create a new entity manager em = entityManagerFactory.createEntityManager(); http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java index 2fee422..cf96488 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java @@ -54,7 +54,8 @@ public class JpaProducer extends DefaultProducer { public void process(final Exchange exchange) { // resolve the entity manager before evaluating the expression - final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, getEndpoint().isUsePassedInEntityManager()); + final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, + getEndpoint().isUsePassedInEntityManager(), getEndpoint().isSharedEntityManager()); final Object values = expression.evaluate(exchange, Object.class); if (values != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java index e352cff..4af8658 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java @@ -50,6 +50,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId private final EntityManagerFactory entityManagerFactory; private final TransactionTemplate transactionTemplate; private boolean joinTransaction = true; + private boolean sharedEntityManager; public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory, String processorName) { this(entityManagerFactory, createTransactionTemplate(entityManagerFactory), processorName); @@ -83,7 +84,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId @Override public boolean add(final Exchange exchange, final String messageId) { - final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true); + final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager); // Run this in single transaction. Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { @@ -118,7 +119,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId @Override public boolean contains(final Exchange exchange, final String messageId) { - final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true); + final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager); // Run this in single transaction. Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { @@ -147,7 +148,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId @Override public boolean remove(final Exchange exchange, final String messageId) { - final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true); + final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager); Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { public Boolean doInTransaction(TransactionStatus status) { @@ -203,6 +204,15 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId this.joinTransaction = joinTransaction; } + @ManagedAttribute(description = "Whether to use shared EntityManager") + public boolean isSharedEntityManager() { + return sharedEntityManager; + } + + public void setSharedEntityManager(boolean sharedEntityManager) { + this.sharedEntityManager = sharedEntityManager; + } + @Override protected void doStart() throws Exception { // noop @@ -212,4 +222,5 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId protected void doStop() throws Exception { // noop } + } http://git-wip-us.apache.org/repos/asf/camel/blob/3a98dd9d/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java new file mode 100644 index 0000000..c534da1 --- /dev/null +++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.jpa; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.examples.SendEmail; +import org.apache.camel.spring.SpringRouteBuilder; +import org.junit.Test; +import org.springframework.context.expression.BeanFactoryResolver; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.orm.jpa.LocalEntityManagerFactoryBean; + +import static org.hamcrest.CoreMatchers.equalTo; + +/** + * @version + */ +public class JpaRouteSharedEntityManagerTest extends AbstractJpaTest { + protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x"; + private CountDownLatch latch = new CountDownLatch(1); + + @Test + public void testRouteJpaShared() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + int countStart = getBrokerCount(); + assertThat("brokerCount", countStart, equalTo(1)); + + template.sendBody("direct:startShared", new SendEmail("o...@somewhere.org")); + // start route + context.startRoute("jpaShared"); + + // not the cleanest way to check the number of open connections + int countEnd = getBrokerCount(); + assertThat("brokerCount", countEnd, equalTo(1)); + + latch.countDown(); + + assertMockEndpointsSatisfied(); + } + + private int getBrokerCount() { + LocalEntityManagerFactoryBean entityManagerFactory = applicationContext.getBean("&entityManagerFactory", LocalEntityManagerFactoryBean.class); + + //uses Spring EL so we don't need to reference the classes + StandardEvaluationContext context = new StandardEvaluationContext(entityManagerFactory); + context.setBeanResolver(new BeanFactoryResolver(applicationContext)); + SpelExpressionParser parser = new SpelExpressionParser(); + Expression expression = parser.parseExpression("nativeEntityManagerFactory.brokerFactory.openBrokers"); + List<?> brokers = expression.getValue(context, List.class); + + return brokers.size(); + } + + @Test + public void testRouteJpaNotShared() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + template.sendBody("direct:startNotshared", new SendEmail("o...@somewhere.org")); + + int countStart = getBrokerCount(); + assertThat("brokerCount", countStart, equalTo(1)); + + // start route + context.startRoute("jpaOwn"); + + // not the cleanest way to check the number of open connections + int countEnd = getBrokerCount(); + assertThat("brokerCount", countEnd, equalTo(2)); + + latch.countDown(); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new SpringRouteBuilder() { + public void configure() { + from("direct:startNotshared").to("jpa://" + SendEmail.class.getName() + "?"); + from("direct:startShared").to("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=true&joinTransaction=false"); + from("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=true&joinTransaction=false").routeId("jpaShared").autoStartup(false).process(new LatchProcessor()).to("mock:result"); + from("jpa://" + SendEmail.class.getName() + "?sharedEntityManager=false").routeId("jpaOwn").autoStartup(false).process(new LatchProcessor()).to("mock:result"); + } + }; + } + + @Override + protected String routeXml() { + return "org/apache/camel/processor/jpa/springJpaRouteTest.xml"; + } + + @Override + protected String selectAllString() { + return SELECT_ALL_STRING; + } + + private class LatchProcessor implements Processor { + @Override + public void process(Exchange exchange) throws Exception { + latch.await(2, TimeUnit.SECONDS); + } + } +} +