Author: davsclaus Date: Sat Oct 6 06:09:45 2012 New Revision: 1394934 URL: http://svn.apache.org/viewvc?rev=1394934&view=rev Log: CAMEL-5681: Fixed recipientlist/multicast so depending on unexpected or exception during routing sets the redelivery exhaust property correctly. And that the doTry .. doCatch will reset the exhaust property to esnure exception is handled like the error handler would do in RedeliveryErrorHandler.
Modified: camel/branches/camel-2.10.x/ (props changed) camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1394932 Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1394934&r1=1394933&r2=1394934&view=diff ============================================================================== --- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat Oct 6 06:09:45 2012 @@ -200,19 +200,11 @@ public class MulticastProcessor extends final AtomicExchange result = new AtomicExchange(); final Iterable<ProcessorExchangePair> pairs; - // multicast uses fine grained error handling on the output processors - // so use try .. catch to cater for this - boolean exhaust = false; try { boolean sync = true; pairs = createProcessorExchangePairs(exchange); - // after we have created the processors we consider the exchange as exhausted if an unhandled - // exception was thrown, (used in the catch block) - // if the processors is working in Streaming model, the exchange could not be processed at this point. - exhaust = !isStreaming(); - if (isParallelProcessing()) { // ensure an executor is set when running in parallel ObjectHelper.notNull(executorService, "executorService", this); @@ -228,15 +220,16 @@ public class MulticastProcessor extends } } catch (Throwable e) { exchange.setException(e); + // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted // and do the done work - doDone(exchange, null, callback, true, exhaust); + doDone(exchange, null, callback, true, false); return true; } // multicasting was processed successfully // and do the done work Exchange subExchange = result.get() != null ? result.get() : null; - doDone(exchange, subExchange, callback, true, exhaust); + doDone(exchange, subExchange, callback, true, true); return true; } @@ -308,7 +301,8 @@ public class MulticastProcessor extends // throw caused exception if (subExchange.getException() != null) { // wrap in exception to explain where it failed - throw new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); + CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); + subExchange.setException(cause); } } @@ -527,14 +521,14 @@ public class MulticastProcessor extends if (stopOnException && !continueProcessing) { if (subExchange.getException() != null) { // wrap in exception to explain where it failed - throw new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException()); - } else { - // we want to stop on exception, and the exception was handled by the error handler - // this is similar to what the pipeline does, so we should do the same to not surprise end users - // so we should set the failed exchange as the result and be done - result.set(subExchange); - return true; + CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException()); + subExchange.setException(cause); } + // we want to stop on exception, and the exception was handled by the error handler + // this is similar to what the pipeline does, so we should do the same to not surprise end users + // so we should set the failed exchange as the result and be done + result.set(subExchange); + return true; } LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange); Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=1394934&r1=1394933&r2=1394934&view=diff ============================================================================== --- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java (original) +++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java Sat Oct 6 06:09:45 2012 @@ -361,6 +361,8 @@ public class TryProcessor extends Servic // give the rest of the pipeline another chance exchange.setProperty(Exchange.EXCEPTION_CAUGHT, caught); exchange.setException(null); + // and we should not be regarded as exhausted as we are in a try .. catch block + exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); // is the exception handled by the catch clause final Boolean handled = catchClause.handles(exchange); Modified: camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java?rev=1394934&r1=1394933&r2=1394934&view=diff ============================================================================== --- camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java (original) +++ camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java Sat Oct 6 06:09:45 2012 @@ -67,8 +67,7 @@ public class MulticastParallelStopOnExce template.sendBody("direct:start", "Kaboom"); fail("Should thrown an exception"); } catch (CamelExecutionException e) { - ExecutionException ee = assertIsInstanceOf(ExecutionException.class, e.getCause()); - CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, ee.getCause()); + CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause()); assertTrue(cause.getMessage().startsWith("Parallel processing failed for number ")); assertTrue(cause.getMessage().contains("Exchange[Message: Kaboom]")); assertEquals("Forced", cause.getCause().getMessage()); Modified: camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java?rev=1394934&r1=1394933&r2=1394934&view=diff ============================================================================== --- camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java (original) +++ camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java Sat Oct 6 06:09:45 2012 @@ -123,10 +123,9 @@ public class RecipientListParallelFineGr fail("Should throw exception"); } catch (CamelExecutionException e) { // expected - assertIsInstanceOf(ExecutionException.class, e.getCause()); - assertIsInstanceOf(CamelExchangeException.class, e.getCause().getCause()); - assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause().getCause()); - assertEquals("Damn", e.getCause().getCause().getCause().getMessage()); + assertIsInstanceOf(CamelExchangeException.class, e.getCause()); + assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause()); + assertEquals("Damn", e.getCause().getCause().getMessage()); } assertMockEndpointsSatisfied(); Modified: camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java?rev=1394934&r1=1394933&r2=1394934&view=diff ============================================================================== --- camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java (original) +++ camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java Sat Oct 6 06:09:45 2012 @@ -64,8 +64,7 @@ public class SplitterParallelStopOnExcep template.sendBody("direct:start", "Hello World,Goodday World,Kaboom,Bye World"); fail("Should thrown an exception"); } catch (CamelExecutionException e) { - ExecutionException ee = assertIsInstanceOf(ExecutionException.class, e.getCause()); - CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, ee.getCause()); + CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause()); assertTrue(cause.getMessage().startsWith("Parallel processing failed for number ")); assertTrue(cause.getMessage().contains("Exchange[Message: Kaboom]")); assertEquals("Forced", cause.getCause().getMessage()); Modified: camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java?rev=1394934&r1=1394933&r2=1394934&view=diff ============================================================================== --- camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java (original) +++ camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java Sat Oct 6 06:09:45 2012 @@ -24,18 +24,12 @@ import org.apache.camel.builder.RouteBui */ public class DoCatchDirectRecipientListTest extends ContextTestSupport { - // TODO: CAMEL-5681 - public void testDoCatchDirectRoute() throws Exception { getMockEndpoint("mock:a").expectedMessageCount(1); getMockEndpoint("mock:b").expectedMessageCount(1); - // getMockEndpoint("mock:c").expectedMessageCount(1); + getMockEndpoint("mock:c").expectedMessageCount(1); - try { - template.sendBody("direct:start", "Hello World"); - } catch (Exception e) { - // should not happen - } + template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); }