Repository: camel Updated Branches: refs/heads/master 0a3a6b3d0 -> 4ecb445e9
CAMEL-7053 Added option SkipLockedEntity to camel-jpa Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4ecb445e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4ecb445e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4ecb445e Branch: refs/heads/master Commit: 4ecb445e9a88c5ae741c0815fc18309fba31ea0e Parents: 0a3a6b3 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue Feb 18 17:25:56 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue Feb 18 17:25:56 2014 +0800 ---------------------------------------------------------------------- components/camel-jpa/pom.xml | 7 ++ .../apache/camel/component/jpa/JpaConsumer.java | 37 +++++- .../apache/camel/examples/VersionedItem.java | 74 +++++++++++ .../jpa/JpaRouteSkipLockedEntityTest.java | 125 +++++++++++++++++++ .../src/test/resources/META-INF/persistence.xml | 11 ++ .../jpa/springJpaRouteSkipLockedTest.xml | 33 +++++ 6 files changed, 285 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-jpa/pom.xml b/components/camel-jpa/pom.xml index 5c42547..5cf0679 100644 --- a/components/camel-jpa/pom.xml +++ b/components/camel-jpa/pom.xml @@ -102,6 +102,12 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hibernate</groupId> + <artifactId>hibernate-entitymanager</artifactId> + <version>${hibernate-version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -113,6 +119,7 @@ <configuration> <persistenceXmlFile>${project.basedir}/src/test/resources/META-INF/persistence.xml</persistenceXmlFile> <includes>org/apache/camel/examples/*.class</includes> + <excludes>org/apache/camel/examples/VersionedItem.class</excludes> <addDefaultConstructor>true</addDefaultConstructor> <enforcePropertyRestrictions>true</enforcePropertyRestrictions> </configuration> http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java index c239410..12514eb 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.jpa; import java.lang.reflect.Method; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -26,7 +27,9 @@ import java.util.Queue; import javax.persistence.Entity; import javax.persistence.EntityManager; import javax.persistence.LockModeType; +import javax.persistence.OptimisticLockException; import javax.persistence.PersistenceException; +import javax.persistence.PessimisticLockException; import javax.persistence.Query; import org.apache.camel.Exchange; @@ -45,6 +48,7 @@ import org.springframework.transaction.support.TransactionTemplate; */ public class JpaConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(JpaConsumer.class); + private static final Map<String, Object> NOWAIT; private final EntityManager entityManager; private final TransactionTemplate transactionTemplate; private QueryFactory queryFactory; @@ -53,10 +57,16 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { private String query; private String namedQuery; private String nativeQuery; - private LockModeType lockModeType = LockModeType.WRITE; + private LockModeType lockModeType = LockModeType.PESSIMISTIC_WRITE; private Map<String, Object> parameters; private Class<?> resultClass; private boolean transacted; + private boolean skipLockedEntity; + + static { + NOWAIT = new HashMap<String, Object>(); + NOWAIT.put("javax.persistence.lock.timeout", 0L); + } private static final class DataHolder { private Exchange exchange; @@ -281,6 +291,21 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { this.transacted = transacted; } + /** + * Sets whether to use NOWAIT on lock and silently skip the entity. This + * allows different instances to process entities at the same time but not + * processing the same entity. + * + * @param skipLockedEntity + */ + public void setSkipLockedEntity(boolean skipLockedEntity) { + this.skipLockedEntity = skipLockedEntity; + } + + public boolean isSkipLockedEntity() { + return skipLockedEntity; + } + // Implementation methods // ------------------------------------------------------------------------- @@ -298,12 +323,20 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { } try { LOG.debug("Acquiring exclusive lock on entity: {}", entity); - entityManager.lock(entity, lockModeType); + if (isSkipLockedEntity()) { + entityManager.lock(entity, lockModeType, NOWAIT); + } else { + entityManager.lock(entity, lockModeType); + } return true; } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Failed to achieve lock on entity: " + entity + ". Reason: " + e, e); } + if (e instanceof PessimisticLockException || e instanceof OptimisticLockException) { + //transaction marked as rollback can't continue gracefully + throw (PersistenceException) e; + } //TODO: Find if possible an alternative way to handle results of native queries. //Result of native queries are Arrays and cannot be locked by all JPA Providers. if (entity.getClass().isArray()) { http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/test/java/org/apache/camel/examples/VersionedItem.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/examples/VersionedItem.java b/components/camel-jpa/src/test/java/org/apache/camel/examples/VersionedItem.java new file mode 100644 index 0000000..d76265a --- /dev/null +++ b/components/camel-jpa/src/test/java/org/apache/camel/examples/VersionedItem.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.examples; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; +import javax.persistence.Version; + +/** + * Represents a task which is added to the database, then removed from the + * database when it is consumed with a version column + * + * @version + */ +@Entity +public class VersionedItem { + @Id + @GeneratedValue + private Long id; + private String name; + @Version + private Long version; + + public VersionedItem() { + } + + public VersionedItem(String name) { + setName(name); + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Long getVersion() { + return version; + } + + public void setVersion(Long version) { + this.version = version; + } + + @Override + public String toString() { + return "VersionedItem [id=" + id + ", name=" + name + "]"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSkipLockedEntityTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSkipLockedEntityTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSkipLockedEntityTest.java new file mode 100644 index 0000000..3c95a06 --- /dev/null +++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSkipLockedEntityTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.jpa; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.examples.VersionedItem; +import org.apache.camel.spring.SpringRouteBuilder; +import org.hibernate.engine.spi.SessionImplementor; +import org.junit.Test; + +/** + * @version + */ +public class JpaRouteSkipLockedEntityTest extends AbstractJpaTest { + protected static final String SELECT_ALL_STRING = "select x from " + VersionedItem.class.getName() + " x"; + + private int count; + private final ReentrantLock lock = new ReentrantLock(); + private Condition cond1 = lock.newCondition(); + + @Test + public void testRouteJpa() throws Exception { + MockEndpoint mock1 = getMockEndpoint("mock:result1"); + mock1.expectedMessageCount(2); + MockEndpoint mock2 = getMockEndpoint("mock:result2"); + mock2.expectedMessageCount(2); + + template.sendBody("jpa://" + VersionedItem.class.getName(), new VersionedItem("one")); + template.sendBody("jpa://" + VersionedItem.class.getName(), new VersionedItem("two")); + template.sendBody("jpa://" + VersionedItem.class.getName(), new VersionedItem("three")); + template.sendBody("jpa://" + VersionedItem.class.getName(), new VersionedItem("four")); + + this.context.startRoute("first"); + this.context.startRoute("second"); + + assertMockEndpointsSatisfied(); + + //force test to wait till finished + this.context.stopRoute("first"); + this.context.stopRoute("second"); + + setLockTimeout(60); + List<?> list = entityManager.createQuery(selectAllString()).getResultList(); + assertEquals(0, list.size()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new SpringRouteBuilder() { + @Override + public void configure() { + String options = "?consumer.skipLockedEntity=true"; //&consumer.lockModeType=PESSIMISTIC_FORCE_INCREMENT"; + from("jpa://" + VersionedItem.class.getName() + options).routeId("first").autoStartup(false).bean(new WaitLatch()).log("route1: ${body}").to("mock:result1"); + from("jpa2://select" + options + "&consumer.query=select s from VersionedItem s").routeId("second").autoStartup(false).bean(new WaitLatch()).log("route2: ${body}").to("mock:result2"); + } + }; + } + + @Override + protected String routeXml() { + return "org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml"; + } + + @Override + protected String selectAllString() { + return SELECT_ALL_STRING; + } + + public class WaitLatch { + public void onMessage(VersionedItem body) throws Exception { + lock.lock(); + try { + + count++; + + // if (count != 1) { + cond1.signal(); + // } + + // if not last + if (count != 4) { + cond1.await(); + } + } finally { + lock.unlock(); + } + } + } + + @Override + public void setUp() throws Exception { + super.setUp(); + setLockTimeout(0); + } + + public void setLockTimeout(int timeout) throws SQLException { + entityManager.getTransaction().begin(); + SessionImplementor session = entityManager.unwrap(SessionImplementor.class); + Connection connection = session.connection(); + connection.createStatement().execute("CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY('derby.locks.waitTimeout', '" + timeout + "')"); + entityManager.getTransaction().commit(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/test/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/resources/META-INF/persistence.xml b/components/camel-jpa/src/test/resources/META-INF/persistence.xml index f799e37..b6cdcdf 100644 --- a/components/camel-jpa/src/test/resources/META-INF/persistence.xml +++ b/components/camel-jpa/src/test/resources/META-INF/persistence.xml @@ -88,4 +88,15 @@ </persistence-unit> <!-- END SNIPPET: e2 --> + <persistence-unit name="hibernate" transaction-type="RESOURCE_LOCAL"> + <provider>org.hibernate.ejb.HibernatePersistence</provider> + <class>org.apache.camel.examples.VersionedItem</class> + + <properties> + <property name="hibernate.connection.driver_class" value="org.apache.derby.jdbc.EmbeddedDriver" /> + <property name="hibernate.connection.url" value="jdbc:derby:target/hibernate;create=true" /> + <property name="hibernate.hbm2ddl.auto" value="create"/> + </properties> + </persistence-unit> + </persistence> http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml new file mode 100644 index 0000000..7cff3ee --- /dev/null +++ b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations + under the License. --> +<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + + <bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate"> + <property name="transactionManager" ref="tm"/> + </bean> + + <bean class="org.springframework.orm.jpa.JpaTransactionManager" id="tm"> + <property name="entityManagerFactory" ref="entityManagerFactory" /> + </bean> + + <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean"> + <property name="persistenceUnitName" value="hibernate" /> + <property name="jpaDialect"> + <bean class="org.springframework.orm.jpa.vendor.HibernateJpaDialect" /> + </property> + </bean> + + <bean class="org.apache.camel.component.jpa.JpaComponent" id="jpa"> + <property name="entityManagerFactory" ref="entityManagerFactory" /> + </bean> + <bean class="org.apache.camel.component.jpa.JpaComponent" id="jpa2"> + <property name="entityManagerFactory" ref="entityManagerFactory" /> + </bean> + +</beans> \ No newline at end of file