Repository: camel
Updated Branches:
  refs/heads/master 0a3a6b3d0 -> 4ecb445e9


CAMEL-7053 Added option SkipLockedEntity to camel-jpa


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4ecb445e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4ecb445e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4ecb445e

Branch: refs/heads/master
Commit: 4ecb445e9a88c5ae741c0815fc18309fba31ea0e
Parents: 0a3a6b3
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Tue Feb 18 17:25:56 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Tue Feb 18 17:25:56 2014 +0800

----------------------------------------------------------------------
 components/camel-jpa/pom.xml                    |   7 ++
 .../apache/camel/component/jpa/JpaConsumer.java |  37 +++++-
 .../apache/camel/examples/VersionedItem.java    |  74 +++++++++++
 .../jpa/JpaRouteSkipLockedEntityTest.java       | 125 +++++++++++++++++++
 .../src/test/resources/META-INF/persistence.xml |  11 ++
 .../jpa/springJpaRouteSkipLockedTest.xml        |  33 +++++
 6 files changed, 285 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-jpa/pom.xml b/components/camel-jpa/pom.xml
index 5c42547..5cf0679 100644
--- a/components/camel-jpa/pom.xml
+++ b/components/camel-jpa/pom.xml
@@ -102,6 +102,12 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+       <groupId>org.hibernate</groupId>
+       <artifactId>hibernate-entitymanager</artifactId>
+       <version>${hibernate-version}</version>
+       <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -113,6 +119,7 @@
         <configuration>
           
<persistenceXmlFile>${project.basedir}/src/test/resources/META-INF/persistence.xml</persistenceXmlFile>
           <includes>org/apache/camel/examples/*.class</includes>
+          <excludes>org/apache/camel/examples/VersionedItem.class</excludes>
           <addDefaultConstructor>true</addDefaultConstructor>
           <enforcePropertyRestrictions>true</enforcePropertyRestrictions>
         </configuration>

http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index c239410..12514eb 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jpa;
 
 import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -26,7 +27,9 @@ import java.util.Queue;
 import javax.persistence.Entity;
 import javax.persistence.EntityManager;
 import javax.persistence.LockModeType;
+import javax.persistence.OptimisticLockException;
 import javax.persistence.PersistenceException;
+import javax.persistence.PessimisticLockException;
 import javax.persistence.Query;
 
 import org.apache.camel.Exchange;
@@ -45,6 +48,7 @@ import 
org.springframework.transaction.support.TransactionTemplate;
  */
 public class JpaConsumer extends ScheduledBatchPollingConsumer {
     private static final Logger LOG = 
LoggerFactory.getLogger(JpaConsumer.class);
+    private static final Map<String, Object> NOWAIT;
     private final EntityManager entityManager;
     private final TransactionTemplate transactionTemplate;
     private QueryFactory queryFactory;
@@ -53,10 +57,16 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
     private String query;
     private String namedQuery;
     private String nativeQuery;
-    private LockModeType lockModeType = LockModeType.WRITE;
+    private LockModeType lockModeType = LockModeType.PESSIMISTIC_WRITE;
     private Map<String, Object> parameters;
     private Class<?> resultClass;
     private boolean transacted;
+    private boolean skipLockedEntity;
+
+    static {
+        NOWAIT = new HashMap<String, Object>();
+        NOWAIT.put("javax.persistence.lock.timeout", 0L);
+    }
 
     private static final class DataHolder {
         private Exchange exchange;
@@ -281,6 +291,21 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
         this.transacted = transacted;
     }
 
+    /**
+     * Sets whether to use NOWAIT on lock and silently skip the entity. This
+     * allows different instances to process entities at the same time but not
+     * processing the same entity.
+     * 
+     * @param skipLockedEntity
+     */
+    public void setSkipLockedEntity(boolean skipLockedEntity) {
+        this.skipLockedEntity = skipLockedEntity;
+    }
+
+    public boolean isSkipLockedEntity() {
+        return skipLockedEntity;
+    }
+
     // Implementation methods
     // 
-------------------------------------------------------------------------
 
@@ -298,12 +323,20 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
         }
         try {
             LOG.debug("Acquiring exclusive lock on entity: {}", entity);
-            entityManager.lock(entity, lockModeType);
+            if (isSkipLockedEntity()) {
+                entityManager.lock(entity, lockModeType, NOWAIT);
+            } else {
+                entityManager.lock(entity, lockModeType);
+            }
             return true;
         } catch (Exception e) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Failed to achieve lock on entity: " + entity + ". 
Reason: " + e, e);
             }
+            if (e instanceof PessimisticLockException || e instanceof 
OptimisticLockException) {
+                //transaction marked as rollback can't continue gracefully
+                throw (PersistenceException) e;
+            }
             //TODO: Find if possible an alternative way to handle results of 
native queries.
             //Result of native queries are Arrays and cannot be locked by all 
JPA Providers.
             if (entity.getClass().isArray()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/test/java/org/apache/camel/examples/VersionedItem.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/test/java/org/apache/camel/examples/VersionedItem.java
 
b/components/camel-jpa/src/test/java/org/apache/camel/examples/VersionedItem.java
new file mode 100644
index 0000000..d76265a
--- /dev/null
+++ 
b/components/camel-jpa/src/test/java/org/apache/camel/examples/VersionedItem.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.examples;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+import javax.persistence.Version;
+
+/**
+ * Represents a task which is added to the database, then removed from the
+ * database when it is consumed with a version column
+ * 
+ * @version
+ */
+@Entity
+public class VersionedItem {
+    @Id
+    @GeneratedValue
+    private Long id;
+    private String name;
+    @Version
+    private Long version;
+
+    public VersionedItem() {
+    }
+
+    public VersionedItem(String name) {
+        setName(name);
+    }
+
+    public Long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Long getVersion() {
+        return version;
+    }
+
+    public void setVersion(Long version) {
+        this.version = version;
+    }
+
+    @Override
+    public String toString() {
+        return "VersionedItem [id=" + id + ", name=" + name + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSkipLockedEntityTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSkipLockedEntityTest.java
 
b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSkipLockedEntityTest.java
new file mode 100644
index 0000000..3c95a06
--- /dev/null
+++ 
b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSkipLockedEntityTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.examples.VersionedItem;
+import org.apache.camel.spring.SpringRouteBuilder;
+import org.hibernate.engine.spi.SessionImplementor;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class JpaRouteSkipLockedEntityTest extends AbstractJpaTest {
+    protected static final String SELECT_ALL_STRING = "select x from " + 
VersionedItem.class.getName() + " x";
+    
+    private int count;
+    private final ReentrantLock lock = new ReentrantLock();
+    private Condition cond1 = lock.newCondition();
+    
+    @Test
+    public void testRouteJpa() throws Exception {
+        MockEndpoint mock1 = getMockEndpoint("mock:result1");
+        mock1.expectedMessageCount(2);
+        MockEndpoint mock2 = getMockEndpoint("mock:result2");
+        mock2.expectedMessageCount(2);
+
+        template.sendBody("jpa://" + VersionedItem.class.getName(), new 
VersionedItem("one"));
+        template.sendBody("jpa://" + VersionedItem.class.getName(), new 
VersionedItem("two"));
+        template.sendBody("jpa://" + VersionedItem.class.getName(), new 
VersionedItem("three"));
+        template.sendBody("jpa://" + VersionedItem.class.getName(), new 
VersionedItem("four"));
+
+        this.context.startRoute("first");
+        this.context.startRoute("second");
+
+        assertMockEndpointsSatisfied();
+       
+        //force test to wait till finished
+        this.context.stopRoute("first");
+        this.context.stopRoute("second");
+
+        setLockTimeout(60);
+        List<?> list = 
entityManager.createQuery(selectAllString()).getResultList();
+        assertEquals(0, list.size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new SpringRouteBuilder() {
+            @Override
+            public void configure() {
+                String options = "?consumer.skipLockedEntity=true"; 
//&consumer.lockModeType=PESSIMISTIC_FORCE_INCREMENT";
+                from("jpa://" + VersionedItem.class.getName() + 
options).routeId("first").autoStartup(false).bean(new WaitLatch()).log("route1: 
${body}").to("mock:result1");
+                from("jpa2://select" + options + "&consumer.query=select s 
from VersionedItem s").routeId("second").autoStartup(false).bean(new 
WaitLatch()).log("route2: ${body}").to("mock:result2");
+            }
+        };
+    }
+
+    @Override
+    protected String routeXml() {
+        return 
"org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml";
+    }
+
+    @Override
+    protected String selectAllString() {
+        return SELECT_ALL_STRING;
+    }
+
+    public class WaitLatch {
+        public void onMessage(VersionedItem body) throws Exception {
+            lock.lock();
+            try {
+
+                count++;
+
+                // if (count != 1) {
+                cond1.signal();
+                // }
+
+                // if not last
+                if (count != 4) {
+                    cond1.await();
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        setLockTimeout(0);
+    }
+
+    public void setLockTimeout(int timeout) throws SQLException {
+        entityManager.getTransaction().begin();
+        SessionImplementor session = 
entityManager.unwrap(SessionImplementor.class);
+        Connection connection = session.connection();
+        connection.createStatement().execute("CALL 
SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY('derby.locks.waitTimeout', '" + timeout 
+ "')");
+        entityManager.getTransaction().commit();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/test/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/resources/META-INF/persistence.xml 
b/components/camel-jpa/src/test/resources/META-INF/persistence.xml
index f799e37..b6cdcdf 100644
--- a/components/camel-jpa/src/test/resources/META-INF/persistence.xml
+++ b/components/camel-jpa/src/test/resources/META-INF/persistence.xml
@@ -88,4 +88,15 @@
   </persistence-unit>
   <!-- END SNIPPET: e2 -->
 
+  <persistence-unit name="hibernate" transaction-type="RESOURCE_LOCAL">
+       <provider>org.hibernate.ejb.HibernatePersistence</provider>
+    <class>org.apache.camel.examples.VersionedItem</class>
+       
+  <properties>
+     <property name="hibernate.connection.driver_class" 
value="org.apache.derby.jdbc.EmbeddedDriver" />
+     <property name="hibernate.connection.url" 
value="jdbc:derby:target/hibernate;create=true" />
+     <property name="hibernate.hbm2ddl.auto" value="create"/>
+   </properties>
+  </persistence-unit>
+  
 </persistence>

http://git-wip-us.apache.org/repos/asf/camel/blob/4ecb445e/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml
 
b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml
new file mode 100644
index 0000000..7cff3ee
--- /dev/null
+++ 
b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/springJpaRouteSkipLockedTest.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE file distributed with 
+       this work for additional information regarding copyright ownership. The 
ASF licenses this file to You under the Apache License, Version 2.0 (the 
+       "License"); you may not use this file except in compliance with the 
License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 
+       Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
+       WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See 
the License for the specific language governing permissions and limitations 
+       under the License. -->
+<beans xmlns="http://www.springframework.org/schema/beans"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd";>
+
+       <bean id="transactionTemplate" 
class="org.springframework.transaction.support.TransactionTemplate">
+               <property name="transactionManager" ref="tm"/>
+       </bean>
+
+       <bean class="org.springframework.orm.jpa.JpaTransactionManager" id="tm">
+               <property name="entityManagerFactory" 
ref="entityManagerFactory" />
+       </bean>
+
+       <bean id="entityManagerFactory" 
class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
+               <property name="persistenceUnitName" value="hibernate" />
+               <property name="jpaDialect">
+                       <bean 
class="org.springframework.orm.jpa.vendor.HibernateJpaDialect" />
+               </property>
+       </bean>
+
+       <bean class="org.apache.camel.component.jpa.JpaComponent" id="jpa">
+               <property name="entityManagerFactory" 
ref="entityManagerFactory" />
+       </bean>
+       <bean class="org.apache.camel.component.jpa.JpaComponent" id="jpa2">
+               <property name="entityManagerFactory" 
ref="entityManagerFactory" />
+       </bean>
+
+</beans>
\ No newline at end of file

Reply via email to