Author: davsclaus
Date: Sat Oct  6 06:02:31 2012
New Revision: 1394932

URL: http://svn.apache.org/viewvc?rev=1394932&view=rev
Log:
CAMEL-5681: Fixed recipientlist/multicast so depending on unexpected or 
exception during routing sets the redelivery exhaust property correctly. And 
that the doTry .. doCatch will reset the exhaust property to esnure exception 
is handled like the error handler would do in RedeliveryErrorHandler.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1394932&r1=1394931&r2=1394932&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Sat Oct  6 06:02:31 2012
@@ -200,19 +200,11 @@ public class MulticastProcessor extends 
         final AtomicExchange result = new AtomicExchange();
         final Iterable<ProcessorExchangePair> pairs;
 
-        // multicast uses fine grained error handling on the output processors
-        // so use try .. catch to cater for this
-        boolean exhaust = false;
         try {
             boolean sync = true;
 
             pairs = createProcessorExchangePairs(exchange);
 
-            // after we have created the processors we consider the exchange 
as exhausted if an unhandled
-            // exception was thrown, (used in the catch block)
-            // if the processors is working in Streaming model, the exchange 
could not be processed at this point.
-            exhaust = !isStreaming();
-
             if (isParallelProcessing()) {
                 // ensure an executor is set when running in parallel
                 ObjectHelper.notNull(executorService, "executorService", this);
@@ -228,15 +220,16 @@ public class MulticastProcessor extends 
             }
         } catch (Throwable e) {
             exchange.setException(e);
+            // unexpected exception was thrown, maybe from iterator etc. so do 
not regard as exhausted
             // and do the done work
-            doDone(exchange, null, callback, true, exhaust);
+            doDone(exchange, null, callback, true, false);
             return true;
         }
 
         // multicasting was processed successfully
         // and do the done work
         Exchange subExchange = result.get() != null ? result.get() : null;
-        doDone(exchange, subExchange, callback, true, exhaust);
+        doDone(exchange, subExchange, callback, true, true);
         return true;
     }
 
@@ -308,7 +301,8 @@ public class MulticastProcessor extends 
                             // throw caused exception
                             if (subExchange.getException() != null) {
                                 // wrap in exception to explain where it failed
-                                throw new CamelExchangeException("Parallel 
processing failed for number " + number, subExchange, 
subExchange.getException());
+                                CamelExchangeException cause = new 
CamelExchangeException("Parallel processing failed for number " + number, 
subExchange, subExchange.getException());
+                                subExchange.setException(cause);
                             }
                         }
 
@@ -527,14 +521,14 @@ public class MulticastProcessor extends 
             if (stopOnException && !continueProcessing) {
                 if (subExchange.getException() != null) {
                     // wrap in exception to explain where it failed
-                    throw new CamelExchangeException("Sequential processing 
failed for number " + total.get(), subExchange, subExchange.getException());
-                } else {
-                    // we want to stop on exception, and the exception was 
handled by the error handler
-                    // this is similar to what the pipeline does, so we should 
do the same to not surprise end users
-                    // so we should set the failed exchange as the result and 
be done
-                    result.set(subExchange);
-                    return true;
+                    CamelExchangeException cause = new 
CamelExchangeException("Sequential processing failed for number " + 
total.get(), subExchange, subExchange.getException());
+                    subExchange.setException(cause);
                 }
+                // we want to stop on exception, and the exception was handled 
by the error handler
+                // this is similar to what the pipeline does, so we should do 
the same to not surprise end users
+                // so we should set the failed exchange as the result and be 
done
+                result.set(subExchange);
+                return true;
             }
 
             LOG.trace("Sequential processing complete for number {} exchange: 
{}", total, subExchange);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=1394932&r1=1394931&r2=1394932&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
 Sat Oct  6 06:02:31 2012
@@ -361,6 +361,8 @@ public class TryProcessor extends Servic
             // give the rest of the pipeline another chance
             exchange.setProperty(Exchange.EXCEPTION_CAUGHT, caught);
             exchange.setException(null);
+            // and we should not be regarded as exhausted as we are in a try 
.. catch block
+            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
 
             // is the exception handled by the catch clause
             final Boolean handled = catchClause.handles(exchange);

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java?rev=1394932&r1=1394931&r2=1394932&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
 Sat Oct  6 06:02:31 2012
@@ -67,8 +67,7 @@ public class MulticastParallelStopOnExce
             template.sendBody("direct:start", "Kaboom");
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
-            ExecutionException ee = 
assertIsInstanceOf(ExecutionException.class, e.getCause());
-            CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, ee.getCause());
+            CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, e.getCause());
             assertTrue(cause.getMessage().startsWith("Parallel processing 
failed for number "));
             assertTrue(cause.getMessage().contains("Exchange[Message: 
Kaboom]"));
             assertEquals("Forced", cause.getCause().getMessage());

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java?rev=1394932&r1=1394931&r2=1394932&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
 Sat Oct  6 06:02:31 2012
@@ -123,10 +123,9 @@ public class RecipientListParallelFineGr
             fail("Should throw exception");
         } catch (CamelExecutionException e) {
             // expected
-            assertIsInstanceOf(ExecutionException.class, e.getCause());
-            assertIsInstanceOf(CamelExchangeException.class, 
e.getCause().getCause());
-            assertIsInstanceOf(IllegalArgumentException.class, 
e.getCause().getCause().getCause());
-            assertEquals("Damn", 
e.getCause().getCause().getCause().getMessage());
+            assertIsInstanceOf(CamelExchangeException.class, e.getCause());
+            assertIsInstanceOf(IllegalArgumentException.class, 
e.getCause().getCause());
+            assertEquals("Damn", e.getCause().getCause().getMessage());
         }
 
         assertMockEndpointsSatisfied();

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java?rev=1394932&r1=1394931&r2=1394932&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
 Sat Oct  6 06:02:31 2012
@@ -64,8 +64,7 @@ public class SplitterParallelStopOnExcep
             template.sendBody("direct:start", "Hello World,Goodday 
World,Kaboom,Bye World");
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
-            ExecutionException ee = 
assertIsInstanceOf(ExecutionException.class, e.getCause());
-            CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, ee.getCause());
+            CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, e.getCause());
             assertTrue(cause.getMessage().startsWith("Parallel processing 
failed for number "));
             assertTrue(cause.getMessage().contains("Exchange[Message: 
Kaboom]"));
             assertEquals("Forced", cause.getCause().getMessage());

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java?rev=1394932&r1=1394931&r2=1394932&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/DoCatchDirectRecipientListTest.java
 Sat Oct  6 06:02:31 2012
@@ -24,18 +24,12 @@ import org.apache.camel.builder.RouteBui
  */
 public class DoCatchDirectRecipientListTest extends ContextTestSupport {
 
-    // TODO: CAMEL-5681
-
     public void testDoCatchDirectRoute() throws Exception {
         getMockEndpoint("mock:a").expectedMessageCount(1);
         getMockEndpoint("mock:b").expectedMessageCount(1);
-        // getMockEndpoint("mock:c").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(1);
 
-        try {
-            template.sendBody("direct:start", "Hello World");
-        } catch (Exception e) {
-            // should not happen
-        }
+        template.sendBody("direct:start", "Hello World");
 
         assertMockEndpointsSatisfied();
     }


Reply via email to