CAMEL-5932 updated consumer and producer for direct EntityManager usage

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1efc537f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1efc537f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1efc537f

Branch: refs/heads/master
Commit: 1efc537f8752772f54c1e2e1ff84bf733ba1d055
Parents: 708c4cd
Author: Brett Meyer <br...@3riverdev.com>
Authored: Thu Aug 8 14:07:33 2013 -0400
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Aug 21 15:06:32 2013 +0200

----------------------------------------------------------------------
 .../camel/component/jpa/JpaConstants.java       |  2 +-
 .../apache/camel/component/jpa/JpaConsumer.java | 28 ++++--
 .../apache/camel/component/jpa/JpaEndpoint.java | 45 +++++-----
 .../apache/camel/component/jpa/JpaProducer.java | 31 ++++---
 .../jpa/JpaTemplateTransactionStrategy.java     | 89 --------------------
 .../component/jpa/TransactionStrategy.java      | 33 --------
 .../idempotent/jpa/JpaMessageIdRepository.java  | 54 +++++++-----
 7 files changed, 96 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
index ed5b922..3e42d9f 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
@@ -23,7 +23,7 @@ package org.apache.camel.component.jpa;
  */
 public final class JpaConstants {
 
-    public static final String JPA_TEMPLATE = "CamelJpaTemplate";
+    public static final String ENTITYMANAGER = "CamelEntityManager";
 
     private JpaConstants() {
         // utility class

http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index 5f81041..10cdd2d 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -36,7 +36,9 @@ import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.orm.jpa.JpaCallback;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
 
 /**
  * @version 
@@ -45,7 +47,8 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JpaConsumer.class);
     private final JpaEndpoint endpoint;
-    private final TransactionStrategy template;
+       private final EntityManager entityManager;
+    private final TransactionTemplate transactionTemplate;
     private QueryFactory queryFactory;
     private DeleteHandler<Object> deleteHandler;
     private String query;
@@ -66,7 +69,8 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
     public JpaConsumer(JpaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
-        this.template = endpoint.createTransactionStrategy();
+        this.entityManager = endpoint.createEntityManager();
+        this.transactionTemplate = endpoint.createTransactionTemplate();
     }
 
     @Override
@@ -75,9 +79,11 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
         shutdownRunningTask = null;
         pendingExchanges = 0;
 
-        Object messagePolled = template.execute(new JpaCallback<Object>() {
-            public Object doInJpa(EntityManager entityManager) throws 
PersistenceException {
-                Queue<DataHolder> answer = new LinkedList<DataHolder>();
+        Object messagePolled = transactionTemplate.execute(new 
TransactionCallback<Object>() {
+            public Object doInTransaction(TransactionStatus status) {
+               entityManager.joinTransaction();
+               
+               Queue<DataHolder> answer = new LinkedList<DataHolder>();
 
                 Query query = getQueryFactory().createQuery(entityManager);
                 configureParameters(query);
@@ -374,7 +380,15 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
     protected Exchange createExchange(Object result) {
         Exchange exchange = endpoint.createExchange();
         exchange.getIn().setBody(result);
-        exchange.getIn().setHeader(JpaConstants.JPA_TEMPLATE, 
endpoint.getTemplate());
+        exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, entityManager);
         return exchange;
     }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        // TODO: This should probably happen, but hitting an open transaction 
or flush in progress.
+        // Is there a thread holding onto it?
+//        entityManager.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
index 19ea714..cee83f2 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
@@ -17,9 +17,9 @@
 package org.apache.camel.component.jpa;
 
 import java.util.Map;
+
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -33,20 +33,21 @@ import org.apache.camel.support.ExpressionAdapter;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
-import org.springframework.orm.jpa.JpaTemplate;
 import org.springframework.orm.jpa.JpaTransactionManager;
+import org.springframework.orm.jpa.LocalEntityManagerFactoryBean;
 import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.support.TransactionTemplate;
 
 public class JpaEndpoint extends ScheduledPollEndpoint {
 
     private EntityManagerFactory entityManagerFactory;
     private PlatformTransactionManager transactionManager;
     private String persistenceUnit = "camel";
-    private JpaTemplate template;
     private Expression producerExpression;
     private int maximumResults = -1;
     private Class<?> entityType;
-    private Map<Object, Object> entityManagerProperties;
+    private Map<String, Object> entityManagerProperties;
     private boolean consumeDelete = true;
     private boolean consumeLockEntity = true;
     private boolean flushOnSend = true;
@@ -116,7 +117,7 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         super.configureProperties(options);
         Map<String, Object> emProperties = 
IntrospectionSupport.extractProperties(options, "emf.");
         if (emProperties != null) {
-            setEntityManagerProperties(CastUtils.cast(emProperties));
+            setEntityManagerProperties(emProperties);
         }
     }
 
@@ -132,17 +133,6 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
 
     // Properties
     // 
-------------------------------------------------------------------------
-    public JpaTemplate getTemplate() {
-        if (template == null) {
-            template = createTemplate();
-        }
-        return template;
-    }
-
-    public void setTemplate(JpaTemplate template) {
-        this.template = template;
-    }
-
     public Expression getProducerExpression() {
         if (producerExpression == null) {
             producerExpression = createProducerExpression();
@@ -192,14 +182,14 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         this.transactionManager = transactionManager;
     }
 
-    public Map<Object, Object> getEntityManagerProperties() {
+    public Map<String, Object> getEntityManagerProperties() {
         if (entityManagerProperties == null) {
-            entityManagerProperties = System.getProperties();
+            entityManagerProperties = CastUtils.cast(System.getProperties());
         }
         return entityManagerProperties;
     }
 
-    public void setEntityManagerProperties(Map<Object, Object> 
entityManagerProperties) {
+    public void setEntityManagerProperties(Map<String, Object> 
entityManagerProperties) {
         this.entityManagerProperties = entityManagerProperties;
     }
 
@@ -258,12 +248,12 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         ObjectHelper.notNull(getEntityManagerFactory(), 
"entityManagerFactory");
     }
 
-    protected JpaTemplate createTemplate() {
-        return new JpaTemplate(getEntityManagerFactory());
-    }
-
     protected EntityManagerFactory createEntityManagerFactory() {
-        return Persistence.createEntityManagerFactory(persistenceUnit, 
getEntityManagerProperties());
+       LocalEntityManagerFactoryBean emfBean = new 
LocalEntityManagerFactoryBean();
+       emfBean.setPersistenceUnitName(persistenceUnit);
+       emfBean.setJpaPropertyMap(getEntityManagerProperties());
+       emfBean.afterPropertiesSet();
+       return emfBean.getObject();
     }
 
     protected PlatformTransactionManager createTransactionManager() {
@@ -276,8 +266,11 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         return getEntityManagerFactory().createEntityManager();
     }
 
-    protected TransactionStrategy createTransactionStrategy() {
-        return 
JpaTemplateTransactionStrategy.newInstance(getTransactionManager(), 
getTemplate());
+    protected TransactionTemplate createTransactionTemplate() {
+       TransactionTemplate transactionTemplate = new 
TransactionTemplate(getTransactionManager());
+        
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+       transactionTemplate.afterPropertiesSet();
+        return transactionTemplate;
     }
 
     protected Expression createProducerExpression() {

http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
index 7e8f2da..c06a1b8 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
@@ -21,18 +21,20 @@ import java.util.Collection;
 import java.util.List;
 
 import javax.persistence.EntityManager;
-import javax.persistence.PersistenceException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.impl.DefaultProducer;
-import org.springframework.orm.jpa.JpaCallback;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
 
 /**
  * @version 
  */
 public class JpaProducer extends DefaultProducer {
-    private final TransactionStrategy template;
+       private final EntityManager entityManager;
+       private final TransactionTemplate transactionTemplate;
     private final JpaEndpoint endpoint;
     private final Expression expression;
 
@@ -40,17 +42,19 @@ public class JpaProducer extends DefaultProducer {
         super(endpoint);
         this.endpoint = endpoint;
         this.expression = expression;
-        this.template = endpoint.createTransactionStrategy();
+        this.entityManager = endpoint.createEntityManager();
+        this.transactionTemplate = endpoint.createTransactionTemplate();
     }
 
     public void process(final Exchange exchange) {
-        exchange.getIn().setHeader(JpaConstants.JPA_TEMPLATE, 
endpoint.getTemplate());
+        exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, entityManager);
         final Object values = expression.evaluate(exchange, Object.class);
         if (values != null) {
-            template.execute(new JpaCallback<Object>() {
-                
-                public Object doInJpa(EntityManager entityManager) throws 
PersistenceException {
-                    if (values.getClass().isArray()) {
+               transactionTemplate.execute(new TransactionCallback<Object>() {
+                public Object doInTransaction(TransactionStatus status) {
+                       entityManager.joinTransaction();
+                       
+                       if (values.getClass().isArray()) {
                         Object[] array = (Object[]) values;
                         for (int index = 0; index < array.length; index++) {
                             Object managedEntity = save(array[index], 
entityManager);
@@ -80,6 +84,7 @@ public class JpaProducer extends DefaultProducer {
                     if (endpoint.isFlushOnSend()) {
                         entityManager.flush();
                     }
+                    
                     return null;
                 }
 
@@ -100,6 +105,12 @@ public class JpaProducer extends DefaultProducer {
                 }
             });
         }
-        exchange.getIn().removeHeader(JpaConstants.JPA_TEMPLATE);
+        exchange.getIn().removeHeader(JpaConstants.ENTITYMANAGER);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        entityManager.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaTemplateTransactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaTemplateTransactionStrategy.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaTemplateTransactionStrategy.java
deleted file mode 100644
index 789fa46..0000000
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaTemplateTransactionStrategy.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.jpa;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.PersistenceException;
-
-import org.springframework.orm.jpa.JpaCallback;
-import org.springframework.orm.jpa.JpaTemplate;
-import org.springframework.orm.jpa.JpaTransactionManager;
-import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-import org.springframework.transaction.support.TransactionTemplate;
-
-/**
- * Delegates the strategy to the {@link JpaTemplate} and {@link 
TransactionTemplate} for transaction handling
- *
- * @version 
- */
-public class JpaTemplateTransactionStrategy implements TransactionStrategy {
-    private final JpaTemplate jpaTemplate;
-    private final TransactionTemplate transactionTemplate;
-
-    public JpaTemplateTransactionStrategy(JpaTemplate jpaTemplate, 
TransactionTemplate transactionTemplate) {
-        this.jpaTemplate = jpaTemplate;
-        this.transactionTemplate = transactionTemplate;
-    }
-
-    /**
-     * Creates a new implementation from the given JPA factory
-     */
-    public static JpaTemplateTransactionStrategy 
newInstance(EntityManagerFactory emf) {
-        JpaTemplate template = new JpaTemplate(emf);
-        return newInstance(emf, template);
-    }
-
-    /**
-     * Creates a new implementation from the given JPA factory and JPA template
-     */
-    public static JpaTemplateTransactionStrategy 
newInstance(EntityManagerFactory emf, JpaTemplate template) {
-        JpaTransactionManager transactionManager = new 
JpaTransactionManager(emf);
-        transactionManager.afterPropertiesSet();
-
-        TransactionTemplate tranasctionTemplate = new 
TransactionTemplate(transactionManager);
-        tranasctionTemplate.afterPropertiesSet();
-
-        return new JpaTemplateTransactionStrategy(template, 
tranasctionTemplate);
-    }
-
-    /**
-     * Creates a new implementation from the given Transaction Manager and JPA 
template
-     */
-    public static JpaTemplateTransactionStrategy 
newInstance(PlatformTransactionManager transactionManager, JpaTemplate 
template) {
-        TransactionTemplate tranasctionTemplate = new 
TransactionTemplate(transactionManager);
-        tranasctionTemplate.afterPropertiesSet();
-
-        return new JpaTemplateTransactionStrategy(template, 
tranasctionTemplate);
-    }
-
-    
-    public Object execute(final JpaCallback<?> callback) {
-        return transactionTemplate.execute(new TransactionCallback<Object>() {
-            public Object doInTransaction(TransactionStatus status) {
-                return jpaTemplate.execute(new JpaCallback<Object>() {
-                    public Object doInJpa(EntityManager entityManager) throws 
PersistenceException {
-                        return callback.doInJpa(entityManager);
-                    }
-                });
-            }
-        });
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/TransactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/TransactionStrategy.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/TransactionStrategy.java
deleted file mode 100644
index e49a742..0000000
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/TransactionStrategy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.jpa;
-
-import org.springframework.orm.jpa.JpaCallback;
-
-/**
- * @version 
- */
-public interface TransactionStrategy {
-
-    /**
-     * Executes in a transaction.
-     *
-     * @param callback the callback
-     * @return the result
-     */
-    Object execute(JpaCallback<?> callback);
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
index b8a49e9..a46f61c 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
@@ -19,15 +19,16 @@ package org.apache.camel.processor.idempotent.jpa;
 import java.util.Date;
 import java.util.List;
 
+import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Persistence;
+import javax.persistence.Query;
 
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.ServiceSupport;
-import org.springframework.orm.jpa.JpaTemplate;
 import org.springframework.orm.jpa.JpaTransactionManager;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
@@ -40,32 +41,31 @@ import 
org.springframework.transaction.support.TransactionTemplate;
 @ManagedResource(description = "JPA based message id repository")
 public class JpaMessageIdRepository extends ServiceSupport implements 
IdempotentRepository<String> {
     protected static final String QUERY_STRING = "select x from " + 
MessageProcessed.class.getName() + " x where x.processorName = ?1 and 
x.messageId = ?2";
-    private final JpaTemplate jpaTemplate;
     private final String processorName;
+    private final EntityManager entityManager;
     private final TransactionTemplate transactionTemplate;
 
-    public JpaMessageIdRepository(JpaTemplate template, String processorName) {
-        this(template, createTransactionTemplate(template), processorName);
+    public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory, 
String processorName) {
+        this(entityManagerFactory, 
createTransactionTemplate(entityManagerFactory), processorName);
     }
 
-    public JpaMessageIdRepository(JpaTemplate template, TransactionTemplate 
transactionTemplate, String processorName) {
-        this.jpaTemplate = template;
+    public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory, 
TransactionTemplate transactionTemplate, String processorName) {
+        this.entityManager = entityManagerFactory.createEntityManager();
         this.processorName = processorName;
         this.transactionTemplate = transactionTemplate;
     }
 
     public static JpaMessageIdRepository jpaMessageIdRepository(String 
persistenceUnit, String processorName) {
-        EntityManagerFactory entityManagerFactory = 
Persistence.createEntityManagerFactory(persistenceUnit);
-        return jpaMessageIdRepository(new JpaTemplate(entityManagerFactory), 
processorName);
+        return 
jpaMessageIdRepository(Persistence.createEntityManagerFactory(persistenceUnit), 
processorName);
     }
 
-    public static JpaMessageIdRepository jpaMessageIdRepository(JpaTemplate 
jpaTemplate, String processorName) {
-        return new JpaMessageIdRepository(jpaTemplate, processorName);
+    public static JpaMessageIdRepository 
jpaMessageIdRepository(EntityManagerFactory entityManagerFactory, String 
processorName) {
+        return new JpaMessageIdRepository(entityManagerFactory, processorName);
     }
 
-    private static TransactionTemplate createTransactionTemplate(JpaTemplate 
jpaTemplate) {
+    private static TransactionTemplate 
createTransactionTemplate(EntityManagerFactory entityManagerFactory) {
         TransactionTemplate transactionTemplate = new TransactionTemplate();
-        transactionTemplate.setTransactionManager(new 
JpaTransactionManager(jpaTemplate.getEntityManagerFactory()));
+        transactionTemplate.setTransactionManager(new 
JpaTransactionManager(entityManagerFactory));
         
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
         return transactionTemplate;
     }
@@ -75,14 +75,16 @@ public class JpaMessageIdRepository extends ServiceSupport 
implements Idempotent
         // Run this in single transaction.
         Boolean rc = transactionTemplate.execute(new 
TransactionCallback<Boolean>() {
             public Boolean doInTransaction(TransactionStatus arg0) {
-                List<?> list = jpaTemplate.find(QUERY_STRING, processorName, 
messageId);
+               entityManager.joinTransaction();
+               
+               List<?> list = query(messageId);
                 if (list.isEmpty()) {
                     MessageProcessed processed = new MessageProcessed();
                     processed.setProcessorName(processorName);
                     processed.setMessageId(messageId);
                     processed.setCreatedAt(new Date());
-                    jpaTemplate.persist(processed);
-                    jpaTemplate.flush();
+                    entityManager.persist(processed);
+                    entityManager.flush();
                     return Boolean.TRUE;
                 } else {
                     return Boolean.FALSE;
@@ -97,7 +99,9 @@ public class JpaMessageIdRepository extends ServiceSupport 
implements Idempotent
         // Run this in single transaction.
         Boolean rc = transactionTemplate.execute(new 
TransactionCallback<Boolean>() {
             public Boolean doInTransaction(TransactionStatus arg0) {
-                List<?> list = jpaTemplate.find(QUERY_STRING, processorName, 
messageId);
+               entityManager.joinTransaction();
+               
+               List<?> list = query(messageId);
                 if (list.isEmpty()) {
                     return Boolean.FALSE;
                 } else {
@@ -112,19 +116,28 @@ public class JpaMessageIdRepository extends 
ServiceSupport implements Idempotent
     public boolean remove(final String messageId) {
         Boolean rc = transactionTemplate.execute(new 
TransactionCallback<Boolean>() {
             public Boolean doInTransaction(TransactionStatus arg0) {
-                List<?> list = jpaTemplate.find(QUERY_STRING, processorName, 
messageId);
+               entityManager.joinTransaction();
+               
+                List<?> list = query(messageId);
                 if (list.isEmpty()) {
                     return Boolean.FALSE;
                 } else {
-                    MessageProcessed processoed = (MessageProcessed) 
list.get(0);
-                    jpaTemplate.remove(processoed);
-                    jpaTemplate.flush();
+                    MessageProcessed processed = (MessageProcessed) 
list.get(0);
+                    entityManager.remove(processed);
+                    entityManager.flush();
                     return Boolean.TRUE;
                 }
             }
         });
         return rc.booleanValue();
     }
+    
+    private List<?> query(final String messageId) {
+       Query query = entityManager.createQuery(QUERY_STRING);
+       query.setParameter(1, processorName);
+       query.setParameter(2, messageId);
+       return query.getResultList();
+    }
 
     public boolean confirm(String s) {
         // noop
@@ -142,5 +155,6 @@ public class JpaMessageIdRepository extends ServiceSupport 
implements Idempotent
 
     @Override
     protected void doStop() throws Exception {
+       entityManager.close();
     }
 }

Reply via email to