CAMEL-5932 updated consumer and producer for direct EntityManager usage
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1efc537f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1efc537f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1efc537f Branch: refs/heads/master Commit: 1efc537f8752772f54c1e2e1ff84bf733ba1d055 Parents: 708c4cd Author: Brett Meyer <br...@3riverdev.com> Authored: Thu Aug 8 14:07:33 2013 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 21 15:06:32 2013 +0200 ---------------------------------------------------------------------- .../camel/component/jpa/JpaConstants.java | 2 +- .../apache/camel/component/jpa/JpaConsumer.java | 28 ++++-- .../apache/camel/component/jpa/JpaEndpoint.java | 45 +++++----- .../apache/camel/component/jpa/JpaProducer.java | 31 ++++--- .../jpa/JpaTemplateTransactionStrategy.java | 89 -------------------- .../component/jpa/TransactionStrategy.java | 33 -------- .../idempotent/jpa/JpaMessageIdRepository.java | 54 +++++++----- 7 files changed, 96 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java index ed5b922..3e42d9f 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java @@ -23,7 +23,7 @@ package org.apache.camel.component.jpa; */ public final class JpaConstants { - public static final String JPA_TEMPLATE = "CamelJpaTemplate"; + public static final String ENTITYMANAGER = "CamelEntityManager"; private JpaConstants() { // utility class http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java index 5f81041..10cdd2d 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java @@ -36,7 +36,9 @@ import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.orm.jpa.JpaCallback; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionTemplate; /** * @version @@ -45,7 +47,8 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(JpaConsumer.class); private final JpaEndpoint endpoint; - private final TransactionStrategy template; + private final EntityManager entityManager; + private final TransactionTemplate transactionTemplate; private QueryFactory queryFactory; private DeleteHandler<Object> deleteHandler; private String query; @@ -66,7 +69,8 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { public JpaConsumer(JpaEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; - this.template = endpoint.createTransactionStrategy(); + this.entityManager = endpoint.createEntityManager(); + this.transactionTemplate = endpoint.createTransactionTemplate(); } @Override @@ -75,9 +79,11 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { shutdownRunningTask = null; pendingExchanges = 0; - Object messagePolled = template.execute(new JpaCallback<Object>() { - public Object doInJpa(EntityManager entityManager) throws PersistenceException { - Queue<DataHolder> answer = new LinkedList<DataHolder>(); + Object messagePolled = transactionTemplate.execute(new TransactionCallback<Object>() { + public Object doInTransaction(TransactionStatus status) { + entityManager.joinTransaction(); + + Queue<DataHolder> answer = new LinkedList<DataHolder>(); Query query = getQueryFactory().createQuery(entityManager); configureParameters(query); @@ -374,7 +380,15 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { protected Exchange createExchange(Object result) { Exchange exchange = endpoint.createExchange(); exchange.getIn().setBody(result); - exchange.getIn().setHeader(JpaConstants.JPA_TEMPLATE, endpoint.getTemplate()); + exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, entityManager); return exchange; } + + @Override + protected void doStop() throws Exception { + super.doStop(); + // TODO: This should probably happen, but hitting an open transaction or flush in progress. + // Is there a thread holding onto it? +// entityManager.close(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java index 19ea714..cee83f2 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java @@ -17,9 +17,9 @@ package org.apache.camel.component.jpa; import java.util.Map; + import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; -import javax.persistence.Persistence; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -33,20 +33,21 @@ import org.apache.camel.support.ExpressionAdapter; import org.apache.camel.util.CastUtils; import org.apache.camel.util.IntrospectionSupport; import org.apache.camel.util.ObjectHelper; -import org.springframework.orm.jpa.JpaTemplate; import org.springframework.orm.jpa.JpaTransactionManager; +import org.springframework.orm.jpa.LocalEntityManagerFactoryBean; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.support.TransactionTemplate; public class JpaEndpoint extends ScheduledPollEndpoint { private EntityManagerFactory entityManagerFactory; private PlatformTransactionManager transactionManager; private String persistenceUnit = "camel"; - private JpaTemplate template; private Expression producerExpression; private int maximumResults = -1; private Class<?> entityType; - private Map<Object, Object> entityManagerProperties; + private Map<String, Object> entityManagerProperties; private boolean consumeDelete = true; private boolean consumeLockEntity = true; private boolean flushOnSend = true; @@ -116,7 +117,7 @@ public class JpaEndpoint extends ScheduledPollEndpoint { super.configureProperties(options); Map<String, Object> emProperties = IntrospectionSupport.extractProperties(options, "emf."); if (emProperties != null) { - setEntityManagerProperties(CastUtils.cast(emProperties)); + setEntityManagerProperties(emProperties); } } @@ -132,17 +133,6 @@ public class JpaEndpoint extends ScheduledPollEndpoint { // Properties // ------------------------------------------------------------------------- - public JpaTemplate getTemplate() { - if (template == null) { - template = createTemplate(); - } - return template; - } - - public void setTemplate(JpaTemplate template) { - this.template = template; - } - public Expression getProducerExpression() { if (producerExpression == null) { producerExpression = createProducerExpression(); @@ -192,14 +182,14 @@ public class JpaEndpoint extends ScheduledPollEndpoint { this.transactionManager = transactionManager; } - public Map<Object, Object> getEntityManagerProperties() { + public Map<String, Object> getEntityManagerProperties() { if (entityManagerProperties == null) { - entityManagerProperties = System.getProperties(); + entityManagerProperties = CastUtils.cast(System.getProperties()); } return entityManagerProperties; } - public void setEntityManagerProperties(Map<Object, Object> entityManagerProperties) { + public void setEntityManagerProperties(Map<String, Object> entityManagerProperties) { this.entityManagerProperties = entityManagerProperties; } @@ -258,12 +248,12 @@ public class JpaEndpoint extends ScheduledPollEndpoint { ObjectHelper.notNull(getEntityManagerFactory(), "entityManagerFactory"); } - protected JpaTemplate createTemplate() { - return new JpaTemplate(getEntityManagerFactory()); - } - protected EntityManagerFactory createEntityManagerFactory() { - return Persistence.createEntityManagerFactory(persistenceUnit, getEntityManagerProperties()); + LocalEntityManagerFactoryBean emfBean = new LocalEntityManagerFactoryBean(); + emfBean.setPersistenceUnitName(persistenceUnit); + emfBean.setJpaPropertyMap(getEntityManagerProperties()); + emfBean.afterPropertiesSet(); + return emfBean.getObject(); } protected PlatformTransactionManager createTransactionManager() { @@ -276,8 +266,11 @@ public class JpaEndpoint extends ScheduledPollEndpoint { return getEntityManagerFactory().createEntityManager(); } - protected TransactionStrategy createTransactionStrategy() { - return JpaTemplateTransactionStrategy.newInstance(getTransactionManager(), getTemplate()); + protected TransactionTemplate createTransactionTemplate() { + TransactionTemplate transactionTemplate = new TransactionTemplate(getTransactionManager()); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + transactionTemplate.afterPropertiesSet(); + return transactionTemplate; } protected Expression createProducerExpression() { http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java index 7e8f2da..c06a1b8 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java @@ -21,18 +21,20 @@ import java.util.Collection; import java.util.List; import javax.persistence.EntityManager; -import javax.persistence.PersistenceException; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.impl.DefaultProducer; -import org.springframework.orm.jpa.JpaCallback; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionTemplate; /** * @version */ public class JpaProducer extends DefaultProducer { - private final TransactionStrategy template; + private final EntityManager entityManager; + private final TransactionTemplate transactionTemplate; private final JpaEndpoint endpoint; private final Expression expression; @@ -40,17 +42,19 @@ public class JpaProducer extends DefaultProducer { super(endpoint); this.endpoint = endpoint; this.expression = expression; - this.template = endpoint.createTransactionStrategy(); + this.entityManager = endpoint.createEntityManager(); + this.transactionTemplate = endpoint.createTransactionTemplate(); } public void process(final Exchange exchange) { - exchange.getIn().setHeader(JpaConstants.JPA_TEMPLATE, endpoint.getTemplate()); + exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, entityManager); final Object values = expression.evaluate(exchange, Object.class); if (values != null) { - template.execute(new JpaCallback<Object>() { - - public Object doInJpa(EntityManager entityManager) throws PersistenceException { - if (values.getClass().isArray()) { + transactionTemplate.execute(new TransactionCallback<Object>() { + public Object doInTransaction(TransactionStatus status) { + entityManager.joinTransaction(); + + if (values.getClass().isArray()) { Object[] array = (Object[]) values; for (int index = 0; index < array.length; index++) { Object managedEntity = save(array[index], entityManager); @@ -80,6 +84,7 @@ public class JpaProducer extends DefaultProducer { if (endpoint.isFlushOnSend()) { entityManager.flush(); } + return null; } @@ -100,6 +105,12 @@ public class JpaProducer extends DefaultProducer { } }); } - exchange.getIn().removeHeader(JpaConstants.JPA_TEMPLATE); + exchange.getIn().removeHeader(JpaConstants.ENTITYMANAGER); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + entityManager.close(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaTemplateTransactionStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaTemplateTransactionStrategy.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaTemplateTransactionStrategy.java deleted file mode 100644 index 789fa46..0000000 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaTemplateTransactionStrategy.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.jpa; - -import javax.persistence.EntityManager; -import javax.persistence.EntityManagerFactory; -import javax.persistence.PersistenceException; - -import org.springframework.orm.jpa.JpaCallback; -import org.springframework.orm.jpa.JpaTemplate; -import org.springframework.orm.jpa.JpaTransactionManager; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionTemplate; - -/** - * Delegates the strategy to the {@link JpaTemplate} and {@link TransactionTemplate} for transaction handling - * - * @version - */ -public class JpaTemplateTransactionStrategy implements TransactionStrategy { - private final JpaTemplate jpaTemplate; - private final TransactionTemplate transactionTemplate; - - public JpaTemplateTransactionStrategy(JpaTemplate jpaTemplate, TransactionTemplate transactionTemplate) { - this.jpaTemplate = jpaTemplate; - this.transactionTemplate = transactionTemplate; - } - - /** - * Creates a new implementation from the given JPA factory - */ - public static JpaTemplateTransactionStrategy newInstance(EntityManagerFactory emf) { - JpaTemplate template = new JpaTemplate(emf); - return newInstance(emf, template); - } - - /** - * Creates a new implementation from the given JPA factory and JPA template - */ - public static JpaTemplateTransactionStrategy newInstance(EntityManagerFactory emf, JpaTemplate template) { - JpaTransactionManager transactionManager = new JpaTransactionManager(emf); - transactionManager.afterPropertiesSet(); - - TransactionTemplate tranasctionTemplate = new TransactionTemplate(transactionManager); - tranasctionTemplate.afterPropertiesSet(); - - return new JpaTemplateTransactionStrategy(template, tranasctionTemplate); - } - - /** - * Creates a new implementation from the given Transaction Manager and JPA template - */ - public static JpaTemplateTransactionStrategy newInstance(PlatformTransactionManager transactionManager, JpaTemplate template) { - TransactionTemplate tranasctionTemplate = new TransactionTemplate(transactionManager); - tranasctionTemplate.afterPropertiesSet(); - - return new JpaTemplateTransactionStrategy(template, tranasctionTemplate); - } - - - public Object execute(final JpaCallback<?> callback) { - return transactionTemplate.execute(new TransactionCallback<Object>() { - public Object doInTransaction(TransactionStatus status) { - return jpaTemplate.execute(new JpaCallback<Object>() { - public Object doInJpa(EntityManager entityManager) throws PersistenceException { - return callback.doInJpa(entityManager); - } - }); - } - }); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/TransactionStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/TransactionStrategy.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/TransactionStrategy.java deleted file mode 100644 index e49a742..0000000 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/TransactionStrategy.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.jpa; - -import org.springframework.orm.jpa.JpaCallback; - -/** - * @version - */ -public interface TransactionStrategy { - - /** - * Executes in a transaction. - * - * @param callback the callback - * @return the result - */ - Object execute(JpaCallback<?> callback); -} http://git-wip-us.apache.org/repos/asf/camel/blob/1efc537f/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java index b8a49e9..a46f61c 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java @@ -19,15 +19,16 @@ package org.apache.camel.processor.idempotent.jpa; import java.util.Date; import java.util.List; +import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; +import javax.persistence.Query; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.ServiceSupport; -import org.springframework.orm.jpa.JpaTemplate; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; @@ -40,32 +41,31 @@ import org.springframework.transaction.support.TransactionTemplate; @ManagedResource(description = "JPA based message id repository") public class JpaMessageIdRepository extends ServiceSupport implements IdempotentRepository<String> { protected static final String QUERY_STRING = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1 and x.messageId = ?2"; - private final JpaTemplate jpaTemplate; private final String processorName; + private final EntityManager entityManager; private final TransactionTemplate transactionTemplate; - public JpaMessageIdRepository(JpaTemplate template, String processorName) { - this(template, createTransactionTemplate(template), processorName); + public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory, String processorName) { + this(entityManagerFactory, createTransactionTemplate(entityManagerFactory), processorName); } - public JpaMessageIdRepository(JpaTemplate template, TransactionTemplate transactionTemplate, String processorName) { - this.jpaTemplate = template; + public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory, TransactionTemplate transactionTemplate, String processorName) { + this.entityManager = entityManagerFactory.createEntityManager(); this.processorName = processorName; this.transactionTemplate = transactionTemplate; } public static JpaMessageIdRepository jpaMessageIdRepository(String persistenceUnit, String processorName) { - EntityManagerFactory entityManagerFactory = Persistence.createEntityManagerFactory(persistenceUnit); - return jpaMessageIdRepository(new JpaTemplate(entityManagerFactory), processorName); + return jpaMessageIdRepository(Persistence.createEntityManagerFactory(persistenceUnit), processorName); } - public static JpaMessageIdRepository jpaMessageIdRepository(JpaTemplate jpaTemplate, String processorName) { - return new JpaMessageIdRepository(jpaTemplate, processorName); + public static JpaMessageIdRepository jpaMessageIdRepository(EntityManagerFactory entityManagerFactory, String processorName) { + return new JpaMessageIdRepository(entityManagerFactory, processorName); } - private static TransactionTemplate createTransactionTemplate(JpaTemplate jpaTemplate) { + private static TransactionTemplate createTransactionTemplate(EntityManagerFactory entityManagerFactory) { TransactionTemplate transactionTemplate = new TransactionTemplate(); - transactionTemplate.setTransactionManager(new JpaTransactionManager(jpaTemplate.getEntityManagerFactory())); + transactionTemplate.setTransactionManager(new JpaTransactionManager(entityManagerFactory)); transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); return transactionTemplate; } @@ -75,14 +75,16 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent // Run this in single transaction. Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { public Boolean doInTransaction(TransactionStatus arg0) { - List<?> list = jpaTemplate.find(QUERY_STRING, processorName, messageId); + entityManager.joinTransaction(); + + List<?> list = query(messageId); if (list.isEmpty()) { MessageProcessed processed = new MessageProcessed(); processed.setProcessorName(processorName); processed.setMessageId(messageId); processed.setCreatedAt(new Date()); - jpaTemplate.persist(processed); - jpaTemplate.flush(); + entityManager.persist(processed); + entityManager.flush(); return Boolean.TRUE; } else { return Boolean.FALSE; @@ -97,7 +99,9 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent // Run this in single transaction. Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { public Boolean doInTransaction(TransactionStatus arg0) { - List<?> list = jpaTemplate.find(QUERY_STRING, processorName, messageId); + entityManager.joinTransaction(); + + List<?> list = query(messageId); if (list.isEmpty()) { return Boolean.FALSE; } else { @@ -112,19 +116,28 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent public boolean remove(final String messageId) { Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { public Boolean doInTransaction(TransactionStatus arg0) { - List<?> list = jpaTemplate.find(QUERY_STRING, processorName, messageId); + entityManager.joinTransaction(); + + List<?> list = query(messageId); if (list.isEmpty()) { return Boolean.FALSE; } else { - MessageProcessed processoed = (MessageProcessed) list.get(0); - jpaTemplate.remove(processoed); - jpaTemplate.flush(); + MessageProcessed processed = (MessageProcessed) list.get(0); + entityManager.remove(processed); + entityManager.flush(); return Boolean.TRUE; } } }); return rc.booleanValue(); } + + private List<?> query(final String messageId) { + Query query = entityManager.createQuery(QUERY_STRING); + query.setParameter(1, processorName); + query.setParameter(2, messageId); + return query.getResultList(); + } public boolean confirm(String s) { // noop @@ -142,5 +155,6 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent @Override protected void doStop() throws Exception { + entityManager.close(); } }