Author: davsclaus
Date: Tue Nov 15 14:48:05 2011
New Revision: 1202215

URL: http://svn.apache.org/viewvc?rev=1202215&view=rev
Log:
CAMEL-4683: Added consumer.transcted option to JpaConsumer to control TX 
behavior.

Added:
    
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java
      - copied, changed from r1202071, 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
Modified:
    
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java

Modified: 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1202215&r1=1202214&r2=1202215&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 (original)
+++ 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 Tue Nov 15 14:48:05 2011
@@ -54,6 +54,7 @@ public class JpaConsumer extends Schedul
     private String nativeQuery;
     private Class<?> resultClass;
     private int maxMessagesPerPoll;
+    private boolean transacted;
     private volatile ShutdownRunningTask shutdownRunningTask;
     private volatile int pendingExchanges;
 
@@ -96,17 +97,29 @@ public class JpaConsumer extends Schedul
                     answer.add(holder);
                 }
 
-                int messagePolled;
+                PersistenceException cause = null;
+                int messagePolled = 0;
                 try {
                     messagePolled = processBatch(CastUtils.cast(answer));
                 } catch (Exception e) {
                     if (e instanceof PersistenceException) {
-                        throw (PersistenceException) e;
+                        cause = (PersistenceException) e;
                     } else {
-                        throw new PersistenceException(e);
+                        cause = new PersistenceException(e);
                     }
                 }
 
+                if (cause != null) {
+                    if (!isTransacted()) {
+                        LOG.warn("Error processing last message due: {}. Will 
commit all previous successful processed message, and ignore this last 
failure.", cause.getMessage(), cause);
+                        entityManager.flush();
+                    } else {
+                        // rollback all by throwning exception
+                        throw cause;
+                    }
+                }
+
+                // commit
                 LOG.debug("Flushing EntityManager");
                 entityManager.flush();
                 return messagePolled;
@@ -270,7 +283,22 @@ public class JpaConsumer extends Schedul
 
     public void setResultClass(Class<?> resultClass) {
         this.resultClass = resultClass;
-    }    
+    }
+
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    /**
+     * Sets whether to run in transacted mode or not.
+     * <p/>
+     * This option is default <tt>false</tt>. When <tt>false</tt> then all the 
good messages
+     * will commit, and the first failed message will rollback.
+     * However when <tt>true</tt>, then all messages will rollback, if just 
one message failed.
+     */
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
 
     // Implementation methods
     // 
-------------------------------------------------------------------------

Copied: 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java
 (from r1202071, 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java?p2=camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java&p1=camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java&r1=1202071&r2=1202215&rev=1202215&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
 (original)
+++ 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java
 Tue Nov 15 14:48:05 2011
@@ -23,7 +23,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.examples.SendEmail;
 import org.apache.camel.spring.SpringCamelContext;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -40,34 +39,38 @@ import org.springframework.transaction.s
 /**
  * @version 
  */
-public class JpaTXRollbackTest extends CamelTestSupport {
+public class JpaNonTXRollbackTest extends CamelTestSupport {
 
     protected static final String SELECT_ALL_STRING = "select x from " + 
SendEmail.class.getName() + " x";
     private static AtomicInteger foo = new AtomicInteger();
     private static AtomicInteger bar = new AtomicInteger();
+    private static AtomicInteger kaboom = new AtomicInteger();
 
     protected ApplicationContext applicationContext;
     protected JpaTemplate jpaTemplate;
 
     @Test
-    public void testTXRollback() throws Exception {
+    public void testNonTXRollback() throws Exception {
         // first create three records
         template.sendBody("jpa://" + SendEmail.class.getName(), new 
SendEmail("f...@beer.org"));
         template.sendBody("jpa://" + SendEmail.class.getName(), new 
SendEmail("b...@beer.org"));
         template.sendBody("jpa://" + SendEmail.class.getName(), new 
SendEmail("kab...@beer.org"));
 
-        // should rollback the entire
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        // we should retry and try again
-        mock.expectedMinimumMessageCount(4);
+        // should only rollback the failed
+        getMockEndpoint("mock:start").expectedMinimumMessageCount(4);
+        // and only the 2 good messages goes here
+        getMockEndpoint("mock:result").expectedMessageCount(2);
 
         // start route
         context.startRoute("foo");
 
         assertMockEndpointsSatisfied();
 
-        assertTrue("Should be >= 2, was: " + foo.intValue(), foo.intValue() >= 
2);
-        assertTrue("Should be >= 2, was: " + bar.intValue(), bar.intValue() >= 
2);
+        assertEquals(1, foo.intValue());
+        assertEquals(1, bar.intValue());
+
+        // kaboom fails and we retry it again
+        assertTrue("Should be >= 2, was: " + kaboom.intValue(), 
kaboom.intValue() >= 2);
     }
 
     @Override
@@ -75,12 +78,14 @@ public class JpaTXRollbackTest extends C
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("jpa://" + SendEmail.class.getName() + 
"?delay=2000").routeId("foo").noAutoStartup()
+                from("jpa://" + SendEmail.class.getName() + 
"?consumer.transacted=false&delay=1000").routeId("foo").noAutoStartup()
+                        .to("mock:start")
                         .process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws 
Exception {
                                 SendEmail send = 
exchange.getIn().getBody(SendEmail.class);
                                 if 
("kab...@beer.org".equals(send.getAddress())) {
+                                    kaboom.incrementAndGet();
                                     throw new 
IllegalArgumentException("Forced");
                                 }
 

Modified: 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java?rev=1202215&r1=1202214&r2=1202215&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
 (original)
+++ 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
 Tue Nov 15 14:48:05 2011
@@ -75,7 +75,7 @@ public class JpaTXRollbackTest extends C
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("jpa://" + SendEmail.class.getName() + 
"?delay=2000").routeId("foo").noAutoStartup()
+                from("jpa://" + SendEmail.class.getName() + 
"?consumer.transacted=true&delay=1000").routeId("foo").noAutoStartup()
                         .process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws 
Exception {


Reply via email to