Author: davsclaus
Date: Mon Dec 14 09:25:50 2009
New Revision: 890241

URL: http://svn.apache.org/viewvc?rev=890241&view=rev
Log:
CAMEL-2278: Multicast and recipientlist now uses fine grained error handled on 
the output side to avoid redelivery all over again. Now redelivery is only 
happening on that individual node that failed.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java
      - copied, changed from r890206, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java
 Mon Dec 14 09:25:50 2009
@@ -32,7 +32,7 @@
     }
 
     public CamelExchangeException(String message, Exchange exchange, Throwable 
cause) {
-        super(createMessage(message, exchange), cause);
+        super(createMessage(message, exchange, cause), cause);
         this.exchange = exchange;
     }
 
@@ -46,4 +46,8 @@
     protected static String createMessage(String message, Exchange exchange) {
         return message + " on the exchange: " + exchange;
     }
+
+    protected static String createMessage(String message, Exchange exchange, 
Throwable cause) {
+        return createMessage(message, exchange) + ". Cause by: [" + 
cause.getClass().getName() + " - " + cause.getMessage() + "]";
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java Mon Dec 
14 09:25:50 2009
@@ -114,9 +114,16 @@
     Processor getNextProcessor();
 
     /**
-     * Gets the defintion of the next processor
+     * Gets the definition of the next processor
      *
      * @return the processor definition
      */
     ProcessorDefinition<?> getProcessorDefinition();
+
+    /**
+     * Gets the {...@link RouteContext}
+     *
+     * @return the route context
+     */
+    RouteContext getRouteContext();
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Dec 
14 09:25:50 2009
@@ -94,6 +94,7 @@
     String ROUTE_STOP         = "CamelRouteStop";
     String REDELIVERED        = "CamelRedelivered";
     String REDELIVERY_COUNTER = "CamelRedeliveryCounter";
+    String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted";
     String ROLLBACK_ONLY      = "CamelRollbackOnly";
     String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast";
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 Mon Dec 14 09:25:50 2009
@@ -144,7 +144,7 @@
             Processor processor = Pipeline.newInstance(eventDrivenProcessors);
 
             // and wrap it in a unit of work so the UoW is on the top, so the 
entire route will be in the same UoW
-            Processor unitOfWorkProcessor = new UnitOfWorkProcessor(processor);
+            Processor unitOfWorkProcessor = new UnitOfWorkProcessor(this, 
processor);
             Processor target = unitOfWorkProcessor;
 
             // and then optionally add route policy processor if a custom 
policy is set

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
 Mon Dec 14 09:25:50 2009
@@ -24,6 +24,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Service;
+import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.TracedRouteNodes;
 import org.apache.camel.spi.UnitOfWork;
@@ -45,6 +46,7 @@
     private Message originalInMessage;
     private final TracedRouteNodes tracedRouteNodes;
     private Set<Object> transactedBy;
+    private RouteContext routeContext;
 
     public DefaultUnitOfWork(Exchange exchange) {
         tracedRouteNodes = new DefaultTracedRouteNodes();
@@ -85,6 +87,7 @@
             transactedBy.clear();
         }
         originalInMessage = null;
+        routeContext = null;
     }
 
     public synchronized void addSynchronization(Synchronization 
synchronization) {
@@ -177,6 +180,14 @@
         getTransactedBy().remove(transactionDefinition);
     }
 
+    public RouteContext getRouteContext() {
+        return routeContext;
+    }
+
+    public void setRouteContext(RouteContext routeContext) {
+        this.routeContext = routeContext;
+    }
+
     private Set<Object> getTransactedBy() {
         if (transactedBy == null) {
             transactedBy = new LinkedHashSet<Object>();

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
 Mon Dec 14 09:25:50 2009
@@ -16,10 +16,8 @@
  */
 package org.apache.camel.model;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -27,11 +25,9 @@
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Processor;
-import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
-import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -141,14 +137,7 @@
             executorService = routeContext.lookup(executorServiceRef, 
ExecutorService.class);
         }
 
-        // wrap list of processors in error handlers so we have fine grained 
error handling
-        List<Processor> processors = new ArrayList<Processor>(list.size());
-        for (Processor output : list) {
-            Processor errorHandler = wrapInErrorHandler(routeContext, output);
-            processors.add(errorHandler);
-        }
-
-        return new MulticastProcessor(processors, aggregationStrategy, 
isParallelProcessing(), executorService,
+        return new MulticastProcessor(list, aggregationStrategy, 
isParallelProcessing(), executorService,
                                       isStreaming(), isStopOnException());
     }
 
@@ -193,4 +182,4 @@
         this.executorService = executorService;
     }
 
-}
\ No newline at end of file
+}

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
 Mon Dec 14 09:25:50 2009
@@ -74,7 +74,7 @@
         Processor childProcessor = createOutputsProcessor(routeContext);
 
         // wrap the on completion route in a unit of work processor
-        childProcessor = new UnitOfWorkProcessor(childProcessor);
+        childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor);
 
         Predicate when = null;
         if (onWhen != null) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 Mon Dec 14 09:25:50 2009
@@ -180,13 +180,13 @@
         if (defn instanceof TryDefinition || defn instanceof CatchDefinition 
|| defn instanceof FinallyDefinition) {
             // do not use error handler for try .. catch .. finally blocks as 
it will handle errors itself
             return channel;
-        } else if (defn instanceof MulticastDefinition) {
-            // do not use error handler for multicast based as it offers fine 
grained error handlers for its outputs
+        } else if (defn instanceof MulticastDefinition || defn instanceof 
RecipientListDefinition) {
+            // do not use error handler for multicast or recipientlist based 
as it offers fine grained error handlers for its outputs
             return channel;
         } else {
             // regular definition so add the error handler
             Processor output = channel.getOutput();
-            Processor errorHandler = wrapInErrorHandler(routeContext, output);
+            Processor errorHandler = wrapInErrorHandler(routeContext, 
getErrorHandlerBuilder(), output);
             // set error handler on channel
             channel.setErrorHandler(errorHandler);
 
@@ -200,11 +200,10 @@
      * @param routeContext the route context
      * @param output the output
      * @return the output wrapped with the error handler
-     * @throws Exception can be thrown
+     * @throws Exception can be thrown if failed to create error handler 
builder
      */
-    protected Processor wrapInErrorHandler(RouteContext routeContext, 
Processor output) throws Exception {
+    protected Processor wrapInErrorHandler(RouteContext routeContext, 
ErrorHandlerBuilder builder, Processor output) throws Exception {
         // create error handler
-        ErrorHandlerBuilder builder = getErrorHandlerBuilder();
         Processor errorHandler = builder.createErrorHandler(routeContext, 
output);
 
         // invoke lifecycles so we can manage this error handler builder
@@ -215,6 +214,8 @@
         return errorHandler;
     }
 
+
+
     /**
      * Adds the given list of interceptors to the channel.
      *

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
 Mon Dec 14 09:25:50 2009
@@ -59,7 +59,7 @@
         Processor childProcessor = routeContext.createProcessor(this);
 
         // wrap it in a unit of work so the route that comes next is also done 
in a unit of work
-        UnitOfWorkProcessor uow = new UnitOfWorkProcessor(childProcessor);
+        UnitOfWorkProcessor uow = new UnitOfWorkProcessor(routeContext, 
childProcessor);
 
         return new ThreadsProcessor(uow, executorService, 
waitForTaskToComplete);
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java 
Mon Dec 14 09:25:50 2009
@@ -98,7 +98,7 @@
         Processor childProcessor = routeContext.createProcessor(this);
 
         // wrap it in a unit of work so the route that comes next is also done 
in a unit of work
-        UnitOfWorkProcessor uow = new UnitOfWorkProcessor(childProcessor);
+        UnitOfWorkProcessor uow = new UnitOfWorkProcessor(routeContext, 
childProcessor);
 
         // create async processor
         Endpoint endpoint = resolveEndpoint(routeContext);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
 Mon Dec 14 09:25:50 2009
@@ -61,6 +61,7 @@
     private ProcessorDefinition<?> definition;
     private ProcessorDefinition<?> childDefinition;
     private CamelContext camelContext;
+    private RouteContext routeContext;
 
     public List<Processor> next() {
         List<Processor> answer = new ArrayList<Processor>(1);
@@ -128,6 +129,10 @@
         this.childDefinition = childDefinition;
     }
 
+    public RouteContext getRouteContext() {
+        return routeContext;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startServices(errorHandler, output);
@@ -139,6 +144,7 @@
     }
 
     public void initChannel(ProcessorDefinition<?> outputDefinition, 
RouteContext routeContext) throws Exception {
+        this.routeContext = routeContext;
         this.definition = outputDefinition;
         this.camelContext = routeContext.getCamelContext();
 
@@ -201,6 +207,11 @@
     }
 
     public void process(Exchange exchange) throws Exception {
+        if (exchange.getUnitOfWork() != null) {
+            // keep route context up to date
+            exchange.getUnitOfWork().setRouteContext(routeContext);
+        }
+
         Processor processor = getOutput();
         if (processor != null && continueProcessing(exchange)) {
             processor.process(exchange);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Mon Dec 14 09:25:50 2009
@@ -30,12 +30,15 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Channel;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.TracedRouteNodes;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -80,12 +83,13 @@
         }
     }
 
-    private final Collection<Processor> processors;
+    private Collection<Processor> processors;
     private final AggregationStrategy aggregationStrategy;
     private final boolean isParallelProcessing;
     private final boolean streaming;
     private final boolean stopOnException;
     private ExecutorService executorService;
+    private Channel channel;
 
     public MulticastProcessor(Collection<Processor> processors) {
         this(processors, null);
@@ -122,18 +126,36 @@
         return "multicast";
     }
 
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
     public void process(Exchange exchange) throws Exception {
         final AtomicExchange result = new AtomicExchange();
         final Iterable<ProcessorExchangePair> pairs = 
createProcessorExchangePairs(exchange);
 
-        if (isParallelProcessing()) {
-            doProcessParallel(result, pairs, isStreaming());
-        } else {
-            doProcessSequential(result, pairs);
-        }
+        // multicast uses fine grained error handling on the output processors
+        // so use try .. catch to cater for this
+        try {
+            if (isParallelProcessing()) {
+                doProcessParallel(result, pairs, isStreaming());
+            } else {
+                doProcessSequential(result, pairs);
+            }
 
-        if (result.get() != null) {
-            ExchangeHelper.copyResults(exchange, result.get());
+            if (result.get() != null) {
+                ExchangeHelper.copyResults(exchange, result.get());
+            }
+        } catch (Exception e) {
+            // multicast uses error handling on its output processors and they 
have tried to redeliver
+            // so we shall signal back to the other error handlers that we are 
exhausted and they should not
+            // also try to redeliver as we will then do that twice
+            exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+            exchange.setException(e);
         }
     }
 
@@ -237,6 +259,19 @@
             // set property which endpoint we send to
             setToEndpoint(exchange, producer);
 
+            if (exchange.getUnitOfWork() != null && 
exchange.getUnitOfWork().getRouteContext() != null) {
+                // wrap the producer in error handler so we have fine grained 
error handling on
+                // the output side instead of the input side
+                // this is needed to support redelivery on that output alone 
and not doing redelivery
+                // for the entire multicast block again which will start from 
scratch again
+                RouteContext routeContext = 
exchange.getUnitOfWork().getRouteContext();
+                ErrorHandlerBuilder builder = 
routeContext.getRoute().getErrorHandlerBuilder();
+
+                // create error handler (create error handler directly to keep 
it light weight,
+                // instead of using 
ProcessorDefinitionHelper.wrapInErrorHandler)
+                producer = builder.createErrorHandler(routeContext, producer);
+            }
+
             // let the producer process it
             producer.process(exchange);
         } catch (Exception e) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 Mon Dec 14 09:25:50 2009
@@ -231,7 +231,10 @@
     protected boolean isDone(Exchange exchange) throws Exception {
         // only done if the exchange hasn't failed
         // and it has not been handled by the failure processor
-        return exchange.getException() == null || 
ExchangeHelper.isFailureHandled(exchange);
+        // or we are exhausted
+        return exchange.getException() == null
+            || ExchangeHelper.isFailureHandled(exchange)
+            || ExchangeHelper.isRedelieryExhausted(exchange);
     }
 
     /**

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
 Mon Dec 14 09:25:50 2009
@@ -19,6 +19,8 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultUnitOfWork;
+import org.apache.camel.spi.RouteContext;
+
 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
 
 /** 
@@ -27,10 +29,17 @@
  */
 public final class UnitOfWorkProcessor extends DelegateProcessor {
 
+    private final RouteContext routeContext;
+
     public UnitOfWorkProcessor(Processor processor) {
+        this(null, processor);
+    }
+
+    public UnitOfWorkProcessor(RouteContext routeContext, Processor processor) 
{
         super(processor);
+        this.routeContext = routeContext;
     }
-    
+
     @Override
     public String toString() {
         return "UnitOfWork(" + processor + ")";

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
 Mon Dec 14 09:25:50 2009
@@ -94,7 +94,6 @@
             return;
         }
 
-
         // interceptor will also trace routes supposed only for TraceEvents so 
we need to skip
         // logging TraceEvents to avoid infinite looping
         if (exchange.getProperty(Exchange.TRACE_EVENT, Boolean.class) != null) 
{

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java 
Mon Dec 14 09:25:50 2009
@@ -110,4 +110,24 @@
      * @param transactionDefinition the transaction definition
      */
     void endTransactedBy(Object transactionDefinition);
+
+    /**
+     * Gets the {...@link RouteContext} that this {...@link UnitOfWork} 
currently is being routed through.
+     * <p/>
+     * Notice that an {...@link Exchange} can be routed through multiple 
routes and thus the
+     * {...@link org.apache.camel.spi.RouteContext} can change over time.
+     *
+     * @return the route context
+     */
+    RouteContext getRouteContext();
+
+    /**
+     * Gets the {...@link RouteContext} that this {...@link UnitOfWork} 
currently is being routed through.
+     * <p/>
+     * Notice that an {...@link Exchange} can be routed through multiple 
routes and thus the
+     * {...@link org.apache.camel.spi.RouteContext} can change over time.
+     *
+     * @param routeContext the route context
+     */
+    void setRouteContext(RouteContext routeContext);
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
Mon Dec 14 09:25:50 2009
@@ -426,6 +426,11 @@
         exchange.setException(null);
     }
 
+    public static boolean isRedelieryExhausted(Exchange exchange) {
+        Boolean exhausted = 
exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.class);
+        return exhausted != null && exhausted;
+    }
+
     /**
      * Extracts the body from the given exchange.
      * <p/>

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
 Mon Dec 14 09:25:50 2009
@@ -72,7 +72,7 @@
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException ee = 
assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertEquals("Sequential processing failed for number 2 on the 
exchange: Exchange[Message: Kaboom]", ee.getMessage());
+            assertTrue(ee.getMessage().startsWith("Sequential processing 
failed for number 2 on the exchange: Exchange[Message: Kaboom]"));
             RollbackExchangeException re = 
assertIsInstanceOf(RollbackExchangeException.class, ee.getCause());
             assertEquals("Intended rollback on the exchange: Exchange[Message: 
Kaboom]", re.getMessage());
         }
@@ -94,7 +94,7 @@
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException ee = 
assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertEquals("Sequential processing failed for number 3 on the 
exchange: Exchange[Message: Kaboom]", ee.getMessage());
+            assertTrue(ee.getMessage().startsWith("Sequential processing 
failed for number 3 on the exchange: Exchange[Message: Kaboom]"));
             RollbackExchangeException re = 
assertIsInstanceOf(RollbackExchangeException.class, ee.getCause());
             assertEquals("Intended rollback on the exchange: Exchange[Message: 
Kaboom]", re.getMessage());
         }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
 Mon Dec 14 09:25:50 2009
@@ -31,12 +31,14 @@
                 onException(Exception.class).maximumRedeliveries(2);
 
                 from("direct:start")
+                    .to("mock:a")
                     .multicast().stopOnException()
                     .to("mock:foo", "mock:bar", "mock:baz");
             }
         });
         context.start();
 
+        getMockEndpoint("mock:a").expectedMessageCount(1);
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         getMockEndpoint("mock:bar").expectedMessageCount(1);
         getMockEndpoint("mock:baz").expectedMessageCount(1);
@@ -53,12 +55,14 @@
                 onException(Exception.class).maximumRedeliveries(2);
 
                 from("direct:start")
+                    .to("mock:a")
                     .multicast().stopOnException()
                     .to("mock:foo", "mock:bar").throwException(new 
IllegalArgumentException("Damn")).to("mock:baz");
             }
         });
         context.start();
 
+        getMockEndpoint("mock:a").expectedMessageCount(1);
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         getMockEndpoint("mock:bar").expectedMessageCount(1);
         getMockEndpoint("mock:baz").expectedMessageCount(0);

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java
 (from r890206, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java&r1=890206&r2=890241&rev=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java
 Mon Dec 14 09:25:50 2009
@@ -22,7 +22,7 @@
 /**
  * @version $Revision$
  */
-public class MulticastFineGrainedErrorHandlingTest extends ContextTestSupport {
+public class MulticastParallelFineGrainedErrorHandlingTest extends 
ContextTestSupport {
 
     public void testMulticastOk() throws Exception {
         context.addRoutes(new RouteBuilder() {
@@ -31,12 +31,14 @@
                 onException(Exception.class).maximumRedeliveries(2);
 
                 from("direct:start")
-                    .multicast().stopOnException()
+                    .to("mock:a")
+                    .multicast().stopOnException().parallelProcessing()
                     .to("mock:foo", "mock:bar", "mock:baz");
             }
         });
         context.start();
 
+        getMockEndpoint("mock:a").expectedMessageCount(1);
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         getMockEndpoint("mock:bar").expectedMessageCount(1);
         getMockEndpoint("mock:baz").expectedMessageCount(1);
@@ -53,15 +55,17 @@
                 onException(Exception.class).maximumRedeliveries(2);
 
                 from("direct:start")
-                    .multicast().stopOnException()
+                    .to("mock:a")
+                    .multicast().stopOnException().parallelProcessing()
                     .to("mock:foo", "mock:bar").throwException(new 
IllegalArgumentException("Damn")).to("mock:baz");
             }
         });
         context.start();
 
+        getMockEndpoint("mock:a").expectedMessageCount(1);
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         getMockEndpoint("mock:bar").expectedMessageCount(1);
-        getMockEndpoint("mock:baz").expectedMessageCount(0);
+        getMockEndpoint("mock:baz").expectedMessageCount(1);
 
         try {
             template.sendBody("direct:start", "Hello World");
@@ -77,4 +81,4 @@
     public boolean isUseRouteBuilder() {
         return false;
     }
-}
+}
\ No newline at end of file

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
 Mon Dec 14 09:25:50 2009
@@ -57,7 +57,7 @@
             ExecutionException ee = 
assertIsInstanceOf(ExecutionException.class, e.getCause());
             CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, ee.getCause());
             assertTrue(cause.getMessage().startsWith("Parallel processing 
failed for number "));
-            assertTrue(cause.getMessage().endsWith("on the exchange: 
Exchange[Message: Kaboom]"));
+            assertTrue(cause.getMessage().contains("on the exchange: 
Exchange[Message: Kaboom]"));
             assertEquals("Forced", cause.getCause().getMessage());
         }
 
@@ -96,4 +96,4 @@
             }
         };
     }
-}
\ No newline at end of file
+}

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
 Mon Dec 14 09:25:50 2009
@@ -51,7 +51,7 @@
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertEquals("Sequential processing failed for number 1 on the 
exchange: Exchange[Message: Kaboom]", cause.getMessage());
+            assertTrue(cause.getMessage().startsWith("Sequential processing 
failed for number 1 on the exchange: Exchange[Message: Kaboom]"));
             assertEquals("Forced", cause.getCause().getMessage());
         }
 
@@ -87,4 +87,4 @@
             }
         }
     }
-}
\ No newline at end of file
+}

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java?rev=890241&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java
 Mon Dec 14 09:25:50 2009
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+
+/**
+ * @version $Revision$
+ */
+public class RecipientListFineGrainedErrorHandlingTest extends 
ContextTestSupport {
+
+    private static int counter;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("fail", new MyFailBean());
+        return jndi;
+    }
+
+    public void testRecipientListOk() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class).maximumRedeliveries(2);
+
+                from("direct:start")
+                    .to("mock:a")
+                    .recipientList(header("foo")).stopOnException();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"mock:foo,mock:bar,mock:baz");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRecipientListError() throws Exception {
+        counter = 0;
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class).maximumRedeliveries(2);
+
+                from("direct:start")
+                    .to("mock:a")
+                    .recipientList(header("foo")).stopOnException();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(0);
+
+        try {
+            template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"mock:foo,mock:bar,bean:fail,mock:baz");
+            fail("Should throw exception");
+        } catch (Exception e) {
+            // expected
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(3, counter);
+    }
+
+    public void testRecipientListAsBeanError() throws Exception {
+        counter = 0;
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setTracing(true);
+
+                onException(Exception.class).maximumRedeliveries(2);
+
+                from("direct:start")
+                    .to("mock:a")
+                    .bean(MyRecipientBean.class);
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should throw exception");
+        } catch (CamelExecutionException e) {
+            // expected
+            assertIsInstanceOf(CamelExchangeException.class, e.getCause());
+            assertIsInstanceOf(IllegalArgumentException.class, 
e.getCause().getCause());
+            assertEquals("Damn", e.getCause().getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(3, counter);
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public static class MyRecipientBean {
+
+        @org.apache.camel.RecipientList(stopOnException = true)
+        public String sendSomewhere(Exchange exchange) {
+            return "mock:foo,mock:bar,bean:fail,mock:baz";
+        }
+    }
+
+    public static class MyFailBean {
+
+        public String doSomething(Exchange exchange) throws Exception {
+            counter++;
+            assertEquals("bean://fail", 
exchange.getProperty(Exchange.TO_ENDPOINT, String.class));
+            throw new IllegalArgumentException("Damn");
+        }
+    }
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java?rev=890241&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
 Mon Dec 14 09:25:50 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+
+/**
+ * @version $Revision$
+ */
+public class RecipientListParallelFineGrainedErrorHandlingTest extends 
ContextTestSupport {
+
+    private static int counter;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("fail", new MyFailBean());
+        return jndi;
+    }
+
+    public void testRecipientListOk() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class).maximumRedeliveries(2);
+
+                from("direct:start")
+                    .to("mock:a")
+                    
.recipientList(header("foo")).stopOnException().parallelProcessing();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"mock:foo,mock:bar,mock:baz");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRecipientListError() throws Exception {
+        counter = 0;
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class).maximumRedeliveries(2);
+
+                from("direct:start")
+                    .to("mock:a")
+                    
.recipientList(header("foo")).stopOnException().parallelProcessing();
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(1);
+
+        try {
+            template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"mock:foo,mock:bar,bean:fail,mock:baz");
+            fail("Should throw exception");
+        } catch (Exception e) {
+            // expected
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(3, counter);
+    }
+
+    public void testRecipientListAsBeanError() throws Exception {
+        counter = 0;
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setTracing(true);
+
+                onException(Exception.class).maximumRedeliveries(2);
+
+                from("direct:start")
+                    .to("mock:a")
+                    .bean(MyRecipientBean.class);
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(1);
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should throw exception");
+        } catch (CamelExecutionException e) {
+            // expected
+            assertIsInstanceOf(ExecutionException.class, e.getCause());
+            assertIsInstanceOf(CamelExchangeException.class, 
e.getCause().getCause());
+            assertIsInstanceOf(IllegalArgumentException.class, 
e.getCause().getCause().getCause());
+            assertEquals("Damn", 
e.getCause().getCause().getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(3, counter);
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public static class MyRecipientBean {
+
+        @org.apache.camel.RecipientList(stopOnException = true, 
parallelProcessoing = true)
+        public String sendSomewhere(Exchange exchange) {
+            return "mock:foo,mock:bar,bean:fail,mock:baz";
+        }
+    }
+
+    public static class MyFailBean {
+
+        public String doSomething(Exchange exchange) throws Exception {
+            counter++;
+            assertEquals("bean://fail", 
exchange.getProperty(Exchange.TO_ENDPOINT, String.class));
+            throw new IllegalArgumentException("Damn");
+        }
+    }
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
 Mon Dec 14 09:25:50 2009
@@ -53,7 +53,7 @@
             ExecutionException ee = 
assertIsInstanceOf(ExecutionException.class, e.getCause());
             CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, ee.getCause());
             assertTrue(cause.getMessage().startsWith("Parallel processing 
failed for number "));
-            assertTrue(cause.getMessage().endsWith("on the exchange: 
Exchange[Message: Kaboom]"));
+            assertTrue(cause.getMessage().contains("on the exchange: 
Exchange[Message: Kaboom]"));
             assertEquals("Forced", cause.getCause().getMessage());
         }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java?rev=890241&r1=890240&r2=890241&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
 Mon Dec 14 09:25:50 2009
@@ -48,7 +48,7 @@
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertEquals("Sequential processing failed for number 1 on the 
exchange: Exchange[Message: Kaboom]", cause.getMessage());
+            assertTrue(cause.getMessage().startsWith("Sequential processing 
failed for number 1 on the exchange: Exchange[Message: Kaboom]"));
             assertEquals("Forced", cause.getCause().getMessage());
         }
 


Reply via email to