Author: davsclaus Date: Tue Nov 15 15:01:15 2011 New Revision: 1202225 URL: http://svn.apache.org/viewvc?rev=1202225&view=rev Log: CAMEL-4683: Added consumer.transcted option to JpaConsumer to control TX behavior.
Added: camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java - copied unchanged from r1202222, camel/branches/camel-2.8.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaNonTXRollbackTest.java Modified: camel/branches/camel-2.7.x/ (props changed) camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java Propchange: camel/branches/camel-2.7.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Nov 15 15:01:15 2011 @@ -1,2 +1,2 @@ -/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766,1199807,1200867,1201638-1201639,1202171 -/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703,1199739,1199804,1200861,1201623,1201637,1202167 +/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766,1199807,1200867,1201638-1201639,1202171,1202222 +/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703,1199739,1199804,1200861,1201623,1201637,1202167,1202215 Propchange: camel/branches/camel-2.7.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1202225&r1=1202224&r2=1202225&view=diff ============================================================================== --- camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original) +++ camel/branches/camel-2.7.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Nov 15 15:01:15 2011 @@ -54,6 +54,7 @@ public class JpaConsumer extends Schedul private String nativeQuery; private Class resultClass; private int maxMessagesPerPoll; + private boolean transacted; private volatile ShutdownRunningTask shutdownRunningTask; private volatile int pendingExchanges; @@ -96,17 +97,29 @@ public class JpaConsumer extends Schedul answer.add(holder); } - int messagePolled; + PersistenceException cause = null; + int messagePolled = 0; try { messagePolled = processBatch(CastUtils.cast(answer)); } catch (Exception e) { if (e instanceof PersistenceException) { - throw (PersistenceException) e; + cause = (PersistenceException) e; } else { - throw new PersistenceException(e); + cause = new PersistenceException(e); } } + if (cause != null) { + if (!isTransacted()) { + LOG.warn("Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause); + entityManager.flush(); + } else { + // rollback all by throwning exception + throw cause; + } + } + + // commit LOG.debug("Flushing EntityManager"); entityManager.flush(); return messagePolled; @@ -260,7 +273,22 @@ public class JpaConsumer extends Schedul public void setResultClass(Class resultClass) { this.resultClass = resultClass; - } + } + + public boolean isTransacted() { + return transacted; + } + + /** + * Sets whether to run in transacted mode or not. + * <p/> + * This option is default <tt>false</tt>. When <tt>false</tt> then all the good messages + * will commit, and the first failed message will rollback. + * However when <tt>true</tt>, then all messages will rollback, if just one message failed. + */ + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } // Implementation methods // ------------------------------------------------------------------------- Modified: camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java?rev=1202225&r1=1202224&r2=1202225&view=diff ============================================================================== --- camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java (original) +++ camel/branches/camel-2.7.x/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java Tue Nov 15 15:01:15 2011 @@ -75,7 +75,7 @@ public class JpaTXRollbackTest extends C return new RouteBuilder() { @Override public void configure() throws Exception { - from("jpa://" + SendEmail.class.getName() + "?delay=2000").routeId("foo").noAutoStartup() + from("jpa://" + SendEmail.class.getName() + "?consumer.transacted=true&delay=1000").routeId("foo").noAutoStartup() .process(new Processor() { @Override public void process(Exchange exchange) throws Exception {