Author: davsclaus Date: Tue Nov 15 14:48:05 2011 New Revision: 1202215 URL: http://svn.apache.org/viewvc?rev=1202215&view=rev Log: CAMEL-4683: Added consumer.transcted option to JpaConsumer to control TX behavior.
Added: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java - copied, changed from r1202071, camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1202215&r1=1202214&r2=1202215&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original) +++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Nov 15 14:48:05 2011 @@ -54,6 +54,7 @@ public class JpaConsumer extends Schedul private String nativeQuery; private Class<?> resultClass; private int maxMessagesPerPoll; + private boolean transacted; private volatile ShutdownRunningTask shutdownRunningTask; private volatile int pendingExchanges; @@ -96,17 +97,29 @@ public class JpaConsumer extends Schedul answer.add(holder); } - int messagePolled; + PersistenceException cause = null; + int messagePolled = 0; try { messagePolled = processBatch(CastUtils.cast(answer)); } catch (Exception e) { if (e instanceof PersistenceException) { - throw (PersistenceException) e; + cause = (PersistenceException) e; } else { - throw new PersistenceException(e); + cause = new PersistenceException(e); } } + if (cause != null) { + if (!isTransacted()) { + LOG.warn("Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause); + entityManager.flush(); + } else { + // rollback all by throwning exception + throw cause; + } + } + + // commit LOG.debug("Flushing EntityManager"); entityManager.flush(); return messagePolled; @@ -270,7 +283,22 @@ public class JpaConsumer extends Schedul public void setResultClass(Class<?> resultClass) { this.resultClass = resultClass; - } + } + + public boolean isTransacted() { + return transacted; + } + + /** + * Sets whether to run in transacted mode or not. + * <p/> + * This option is default <tt>false</tt>. When <tt>false</tt> then all the good messages + * will commit, and the first failed message will rollback. + * However when <tt>true</tt>, then all messages will rollback, if just one message failed. + */ + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } // Implementation methods // ------------------------------------------------------------------------- Copied: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java (from r1202071, camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java?p2=camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java&p1=camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java&r1=1202071&r2=1202215&rev=1202215&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java (original) +++ camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java Tue Nov 15 14:48:05 2011 @@ -23,7 +23,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.examples.SendEmail; import org.apache.camel.spring.SpringCamelContext; import org.apache.camel.test.junit4.CamelTestSupport; @@ -40,34 +39,38 @@ import org.springframework.transaction.s /** * @version */ -public class JpaTXRollbackTest extends CamelTestSupport { +public class JpaNonTXRollbackTest extends CamelTestSupport { protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x"; private static AtomicInteger foo = new AtomicInteger(); private static AtomicInteger bar = new AtomicInteger(); + private static AtomicInteger kaboom = new AtomicInteger(); protected ApplicationContext applicationContext; protected JpaTemplate jpaTemplate; @Test - public void testTXRollback() throws Exception { + public void testNonTXRollback() throws Exception { // first create three records template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("f...@beer.org")); template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("b...@beer.org")); template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("kab...@beer.org")); - // should rollback the entire - MockEndpoint mock = getMockEndpoint("mock:result"); - // we should retry and try again - mock.expectedMinimumMessageCount(4); + // should only rollback the failed + getMockEndpoint("mock:start").expectedMinimumMessageCount(4); + // and only the 2 good messages goes here + getMockEndpoint("mock:result").expectedMessageCount(2); // start route context.startRoute("foo"); assertMockEndpointsSatisfied(); - assertTrue("Should be >= 2, was: " + foo.intValue(), foo.intValue() >= 2); - assertTrue("Should be >= 2, was: " + bar.intValue(), bar.intValue() >= 2); + assertEquals(1, foo.intValue()); + assertEquals(1, bar.intValue()); + + // kaboom fails and we retry it again + assertTrue("Should be >= 2, was: " + kaboom.intValue(), kaboom.intValue() >= 2); } @Override @@ -75,12 +78,14 @@ public class JpaTXRollbackTest extends C return new RouteBuilder() { @Override public void configure() throws Exception { - from("jpa://" + SendEmail.class.getName() + "?delay=2000").routeId("foo").noAutoStartup() + from("jpa://" + SendEmail.class.getName() + "?consumer.transacted=false&delay=1000").routeId("foo").noAutoStartup() + .to("mock:start") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { SendEmail send = exchange.getIn().getBody(SendEmail.class); if ("kab...@beer.org".equals(send.getAddress())) { + kaboom.incrementAndGet(); throw new IllegalArgumentException("Forced"); } Modified: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java?rev=1202215&r1=1202214&r2=1202215&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java (original) +++ camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java Tue Nov 15 14:48:05 2011 @@ -75,7 +75,7 @@ public class JpaTXRollbackTest extends C return new RouteBuilder() { @Override public void configure() throws Exception { - from("jpa://" + SendEmail.class.getName() + "?delay=2000").routeId("foo").noAutoStartup() + from("jpa://" + SendEmail.class.getName() + "?consumer.transacted=true&delay=1000").routeId("foo").noAutoStartup() .process(new Processor() { @Override public void process(Exchange exchange) throws Exception {