CAMEL-8478: IdempotentRepository - Add clear operation, align JpaMessageIdRepository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/02a6aa25 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/02a6aa25 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/02a6aa25 Branch: refs/heads/master Commit: 02a6aa2571f0eb5d496d2bb850c0d0b75ff79138 Parents: c4261d6 Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Jul 3 17:08:10 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Jul 3 17:08:10 2015 +0200 ---------------------------------------------------------------------- .../idempotent/jpa/JpaMessageIdRepository.java | 35 ++++++++++++++++++++ 1 file changed, 35 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/02a6aa25/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java index 4af8658..2f39177 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java @@ -17,7 +17,9 @@ package org.apache.camel.processor.idempotent.jpa; import java.util.Date; +import java.util.Iterator; import java.util.List; + import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; @@ -45,6 +47,7 @@ import static org.apache.camel.component.jpa.JpaHelper.getTargetEntityManager; @ManagedResource(description = "JPA based message id repository") public class JpaMessageIdRepository extends ServiceSupport implements ExchangeIdempotentRepository<String> { protected static final String QUERY_STRING = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1 and x.messageId = ?2"; + protected static final String QUERY_CLEAR_STRING = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1"; private static final Logger LOG = LoggerFactory.getLogger(JpaMessageIdRepository.class); private final String processorName; private final EntityManagerFactory entityManagerFactory; @@ -183,12 +186,44 @@ public class JpaMessageIdRepository extends ServiceSupport implements ExchangeId return true; } + @ManagedOperation(description = "Clear the store") + public void clear() { + final EntityManager entityManager = getTargetEntityManager(null, entityManagerFactory, true, sharedEntityManager); + + Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { + public Boolean doInTransaction(TransactionStatus status) { + if (isJoinTransaction()) { + entityManager.joinTransaction(); + } + + List<?> list = queryClear(entityManager); + if (!list.isEmpty()) { + Iterator it = list.iterator(); + while (it.hasNext()) { + Object item = it.next(); + entityManager.remove(item); + entityManager.flush(); + } + } + return Boolean.TRUE; + } + }); + + LOG.debug("clear the store {}", MessageProcessed.class.getName()); + } + private List<?> query(final EntityManager entityManager, final String messageId) { Query query = entityManager.createQuery(QUERY_STRING); query.setParameter(1, processorName); query.setParameter(2, messageId); return query.getResultList(); } + + private List<?> queryClear(final EntityManager entityManager) { + Query query = entityManager.createQuery(QUERY_CLEAR_STRING); + query.setParameter(1, processorName); + return query.getResultList(); + } @ManagedAttribute(description = "The processor name") public String getProcessorName() {