Author: davsclaus Date: Mon Dec 14 09:25:50 2009 New Revision: 890241 URL: http://svn.apache.org/viewvc?rev=890241&view=rev Log: CAMEL-2278: Multicast and recipientlist now uses fine grained error handled on the output side to avoid redelivery all over again. Now redelivery is only happening on that individual node that failed.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java - copied, changed from r890206, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExchangeException.java Mon Dec 14 09:25:50 2009 @@ -32,7 +32,7 @@ } public CamelExchangeException(String message, Exchange exchange, Throwable cause) { - super(createMessage(message, exchange), cause); + super(createMessage(message, exchange, cause), cause); this.exchange = exchange; } @@ -46,4 +46,8 @@ protected static String createMessage(String message, Exchange exchange) { return message + " on the exchange: " + exchange; } + + protected static String createMessage(String message, Exchange exchange, Throwable cause) { + return createMessage(message, exchange) + ". Cause by: [" + cause.getClass().getName() + " - " + cause.getMessage() + "]"; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java Mon Dec 14 09:25:50 2009 @@ -114,9 +114,16 @@ Processor getNextProcessor(); /** - * Gets the defintion of the next processor + * Gets the definition of the next processor * * @return the processor definition */ ProcessorDefinition<?> getProcessorDefinition(); + + /** + * Gets the {...@link RouteContext} + * + * @return the route context + */ + RouteContext getRouteContext(); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Dec 14 09:25:50 2009 @@ -94,6 +94,7 @@ String ROUTE_STOP = "CamelRouteStop"; String REDELIVERED = "CamelRedelivered"; String REDELIVERY_COUNTER = "CamelRedeliveryCounter"; + String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted"; String ROLLBACK_ONLY = "CamelRollbackOnly"; String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Mon Dec 14 09:25:50 2009 @@ -144,7 +144,7 @@ Processor processor = Pipeline.newInstance(eventDrivenProcessors); // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW - Processor unitOfWorkProcessor = new UnitOfWorkProcessor(processor); + Processor unitOfWorkProcessor = new UnitOfWorkProcessor(this, processor); Processor target = unitOfWorkProcessor; // and then optionally add route policy processor if a custom policy is set Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Mon Dec 14 09:25:50 2009 @@ -24,6 +24,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Service; +import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.TracedRouteNodes; import org.apache.camel.spi.UnitOfWork; @@ -45,6 +46,7 @@ private Message originalInMessage; private final TracedRouteNodes tracedRouteNodes; private Set<Object> transactedBy; + private RouteContext routeContext; public DefaultUnitOfWork(Exchange exchange) { tracedRouteNodes = new DefaultTracedRouteNodes(); @@ -85,6 +87,7 @@ transactedBy.clear(); } originalInMessage = null; + routeContext = null; } public synchronized void addSynchronization(Synchronization synchronization) { @@ -177,6 +180,14 @@ getTransactedBy().remove(transactionDefinition); } + public RouteContext getRouteContext() { + return routeContext; + } + + public void setRouteContext(RouteContext routeContext) { + this.routeContext = routeContext; + } + private Set<Object> getTransactedBy() { if (transactedBy == null) { transactedBy = new LinkedHashSet<Object>(); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Mon Dec 14 09:25:50 2009 @@ -16,10 +16,8 @@ */ package org.apache.camel.model; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; - import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; @@ -27,11 +25,9 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Processor; -import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; -import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.RouteContext; /** @@ -141,14 +137,7 @@ executorService = routeContext.lookup(executorServiceRef, ExecutorService.class); } - // wrap list of processors in error handlers so we have fine grained error handling - List<Processor> processors = new ArrayList<Processor>(list.size()); - for (Processor output : list) { - Processor errorHandler = wrapInErrorHandler(routeContext, output); - processors.add(errorHandler); - } - - return new MulticastProcessor(processors, aggregationStrategy, isParallelProcessing(), executorService, + return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executorService, isStreaming(), isStopOnException()); } @@ -193,4 +182,4 @@ this.executorService = executorService; } -} \ No newline at end of file +} Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Mon Dec 14 09:25:50 2009 @@ -74,7 +74,7 @@ Processor childProcessor = createOutputsProcessor(routeContext); // wrap the on completion route in a unit of work processor - childProcessor = new UnitOfWorkProcessor(childProcessor); + childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor); Predicate when = null; if (onWhen != null) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon Dec 14 09:25:50 2009 @@ -180,13 +180,13 @@ if (defn instanceof TryDefinition || defn instanceof CatchDefinition || defn instanceof FinallyDefinition) { // do not use error handler for try .. catch .. finally blocks as it will handle errors itself return channel; - } else if (defn instanceof MulticastDefinition) { - // do not use error handler for multicast based as it offers fine grained error handlers for its outputs + } else if (defn instanceof MulticastDefinition || defn instanceof RecipientListDefinition) { + // do not use error handler for multicast or recipientlist based as it offers fine grained error handlers for its outputs return channel; } else { // regular definition so add the error handler Processor output = channel.getOutput(); - Processor errorHandler = wrapInErrorHandler(routeContext, output); + Processor errorHandler = wrapInErrorHandler(routeContext, getErrorHandlerBuilder(), output); // set error handler on channel channel.setErrorHandler(errorHandler); @@ -200,11 +200,10 @@ * @param routeContext the route context * @param output the output * @return the output wrapped with the error handler - * @throws Exception can be thrown + * @throws Exception can be thrown if failed to create error handler builder */ - protected Processor wrapInErrorHandler(RouteContext routeContext, Processor output) throws Exception { + protected Processor wrapInErrorHandler(RouteContext routeContext, ErrorHandlerBuilder builder, Processor output) throws Exception { // create error handler - ErrorHandlerBuilder builder = getErrorHandlerBuilder(); Processor errorHandler = builder.createErrorHandler(routeContext, output); // invoke lifecycles so we can manage this error handler builder @@ -215,6 +214,8 @@ return errorHandler; } + + /** * Adds the given list of interceptors to the channel. * Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Mon Dec 14 09:25:50 2009 @@ -59,7 +59,7 @@ Processor childProcessor = routeContext.createProcessor(this); // wrap it in a unit of work so the route that comes next is also done in a unit of work - UnitOfWorkProcessor uow = new UnitOfWorkProcessor(childProcessor); + UnitOfWorkProcessor uow = new UnitOfWorkProcessor(routeContext, childProcessor); return new ThreadsProcessor(uow, executorService, waitForTaskToComplete); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Mon Dec 14 09:25:50 2009 @@ -98,7 +98,7 @@ Processor childProcessor = routeContext.createProcessor(this); // wrap it in a unit of work so the route that comes next is also done in a unit of work - UnitOfWorkProcessor uow = new UnitOfWorkProcessor(childProcessor); + UnitOfWorkProcessor uow = new UnitOfWorkProcessor(routeContext, childProcessor); // create async processor Endpoint endpoint = resolveEndpoint(routeContext); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Mon Dec 14 09:25:50 2009 @@ -61,6 +61,7 @@ private ProcessorDefinition<?> definition; private ProcessorDefinition<?> childDefinition; private CamelContext camelContext; + private RouteContext routeContext; public List<Processor> next() { List<Processor> answer = new ArrayList<Processor>(1); @@ -128,6 +129,10 @@ this.childDefinition = childDefinition; } + public RouteContext getRouteContext() { + return routeContext; + } + @Override protected void doStart() throws Exception { ServiceHelper.startServices(errorHandler, output); @@ -139,6 +144,7 @@ } public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { + this.routeContext = routeContext; this.definition = outputDefinition; this.camelContext = routeContext.getCamelContext(); @@ -201,6 +207,11 @@ } public void process(Exchange exchange) throws Exception { + if (exchange.getUnitOfWork() != null) { + // keep route context up to date + exchange.getUnitOfWork().setRouteContext(routeContext); + } + Processor processor = getOutput(); if (processor != null && continueProcessing(exchange)) { processor.process(exchange); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Dec 14 09:25:50 2009 @@ -30,12 +30,15 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.CamelExchangeException; +import org.apache.camel.Channel; import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.TracedRouteNodes; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ServiceHelper; @@ -80,12 +83,13 @@ } } - private final Collection<Processor> processors; + private Collection<Processor> processors; private final AggregationStrategy aggregationStrategy; private final boolean isParallelProcessing; private final boolean streaming; private final boolean stopOnException; private ExecutorService executorService; + private Channel channel; public MulticastProcessor(Collection<Processor> processors) { this(processors, null); @@ -122,18 +126,36 @@ return "multicast"; } + public Channel getChannel() { + return channel; + } + + public void setChannel(Channel channel) { + this.channel = channel; + } + public void process(Exchange exchange) throws Exception { final AtomicExchange result = new AtomicExchange(); final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange); - if (isParallelProcessing()) { - doProcessParallel(result, pairs, isStreaming()); - } else { - doProcessSequential(result, pairs); - } + // multicast uses fine grained error handling on the output processors + // so use try .. catch to cater for this + try { + if (isParallelProcessing()) { + doProcessParallel(result, pairs, isStreaming()); + } else { + doProcessSequential(result, pairs); + } - if (result.get() != null) { - ExchangeHelper.copyResults(exchange, result.get()); + if (result.get() != null) { + ExchangeHelper.copyResults(exchange, result.get()); + } + } catch (Exception e) { + // multicast uses error handling on its output processors and they have tried to redeliver + // so we shall signal back to the other error handlers that we are exhausted and they should not + // also try to redeliver as we will then do that twice + exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); + exchange.setException(e); } } @@ -237,6 +259,19 @@ // set property which endpoint we send to setToEndpoint(exchange, producer); + if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) { + // wrap the producer in error handler so we have fine grained error handling on + // the output side instead of the input side + // this is needed to support redelivery on that output alone and not doing redelivery + // for the entire multicast block again which will start from scratch again + RouteContext routeContext = exchange.getUnitOfWork().getRouteContext(); + ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder(); + + // create error handler (create error handler directly to keep it light weight, + // instead of using ProcessorDefinitionHelper.wrapInErrorHandler) + producer = builder.createErrorHandler(routeContext, producer); + } + // let the producer process it producer.process(exchange); } catch (Exception e) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Dec 14 09:25:50 2009 @@ -231,7 +231,10 @@ protected boolean isDone(Exchange exchange) throws Exception { // only done if the exchange hasn't failed // and it has not been handled by the failure processor - return exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange); + // or we are exhausted + return exchange.getException() == null + || ExchangeHelper.isFailureHandled(exchange) + || ExchangeHelper.isRedelieryExhausted(exchange); } /** Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Mon Dec 14 09:25:50 2009 @@ -19,6 +19,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultUnitOfWork; +import org.apache.camel.spi.RouteContext; + import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; /** @@ -27,10 +29,17 @@ */ public final class UnitOfWorkProcessor extends DelegateProcessor { + private final RouteContext routeContext; + public UnitOfWorkProcessor(Processor processor) { + this(null, processor); + } + + public UnitOfWorkProcessor(RouteContext routeContext, Processor processor) { super(processor); + this.routeContext = routeContext; } - + @Override public String toString() { return "UnitOfWork(" + processor + ")"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java Mon Dec 14 09:25:50 2009 @@ -94,7 +94,6 @@ return; } - // interceptor will also trace routes supposed only for TraceEvents so we need to skip // logging TraceEvents to avoid infinite looping if (exchange.getProperty(Exchange.TRACE_EVENT, Boolean.class) != null) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java Mon Dec 14 09:25:50 2009 @@ -110,4 +110,24 @@ * @param transactionDefinition the transaction definition */ void endTransactedBy(Object transactionDefinition); + + /** + * Gets the {...@link RouteContext} that this {...@link UnitOfWork} currently is being routed through. + * <p/> + * Notice that an {...@link Exchange} can be routed through multiple routes and thus the + * {...@link org.apache.camel.spi.RouteContext} can change over time. + * + * @return the route context + */ + RouteContext getRouteContext(); + + /** + * Gets the {...@link RouteContext} that this {...@link UnitOfWork} currently is being routed through. + * <p/> + * Notice that an {...@link Exchange} can be routed through multiple routes and thus the + * {...@link org.apache.camel.spi.RouteContext} can change over time. + * + * @param routeContext the route context + */ + void setRouteContext(RouteContext routeContext); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Mon Dec 14 09:25:50 2009 @@ -426,6 +426,11 @@ exchange.setException(null); } + public static boolean isRedelieryExhausted(Exchange exchange) { + Boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.class); + return exhausted != null && exhausted; + } + /** * Extracts the body from the given exchange. * <p/> Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java Mon Dec 14 09:25:50 2009 @@ -72,7 +72,7 @@ fail("Should thrown an exception"); } catch (CamelExecutionException e) { CamelExchangeException ee = assertIsInstanceOf(CamelExchangeException.class, e.getCause()); - assertEquals("Sequential processing failed for number 2 on the exchange: Exchange[Message: Kaboom]", ee.getMessage()); + assertTrue(ee.getMessage().startsWith("Sequential processing failed for number 2 on the exchange: Exchange[Message: Kaboom]")); RollbackExchangeException re = assertIsInstanceOf(RollbackExchangeException.class, ee.getCause()); assertEquals("Intended rollback on the exchange: Exchange[Message: Kaboom]", re.getMessage()); } @@ -94,7 +94,7 @@ fail("Should thrown an exception"); } catch (CamelExecutionException e) { CamelExchangeException ee = assertIsInstanceOf(CamelExchangeException.class, e.getCause()); - assertEquals("Sequential processing failed for number 3 on the exchange: Exchange[Message: Kaboom]", ee.getMessage()); + assertTrue(ee.getMessage().startsWith("Sequential processing failed for number 3 on the exchange: Exchange[Message: Kaboom]")); RollbackExchangeException re = assertIsInstanceOf(RollbackExchangeException.class, ee.getCause()); assertEquals("Intended rollback on the exchange: Exchange[Message: Kaboom]", re.getMessage()); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java Mon Dec 14 09:25:50 2009 @@ -31,12 +31,14 @@ onException(Exception.class).maximumRedeliveries(2); from("direct:start") + .to("mock:a") .multicast().stopOnException() .to("mock:foo", "mock:bar", "mock:baz"); } }); context.start(); + getMockEndpoint("mock:a").expectedMessageCount(1); getMockEndpoint("mock:foo").expectedMessageCount(1); getMockEndpoint("mock:bar").expectedMessageCount(1); getMockEndpoint("mock:baz").expectedMessageCount(1); @@ -53,12 +55,14 @@ onException(Exception.class).maximumRedeliveries(2); from("direct:start") + .to("mock:a") .multicast().stopOnException() .to("mock:foo", "mock:bar").throwException(new IllegalArgumentException("Damn")).to("mock:baz"); } }); context.start(); + getMockEndpoint("mock:a").expectedMessageCount(1); getMockEndpoint("mock:foo").expectedMessageCount(1); getMockEndpoint("mock:bar").expectedMessageCount(1); getMockEndpoint("mock:baz").expectedMessageCount(0); Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java (from r890206, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java&r1=890206&r2=890241&rev=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelFineGrainedErrorHandlingTest.java Mon Dec 14 09:25:50 2009 @@ -22,7 +22,7 @@ /** * @version $Revision$ */ -public class MulticastFineGrainedErrorHandlingTest extends ContextTestSupport { +public class MulticastParallelFineGrainedErrorHandlingTest extends ContextTestSupport { public void testMulticastOk() throws Exception { context.addRoutes(new RouteBuilder() { @@ -31,12 +31,14 @@ onException(Exception.class).maximumRedeliveries(2); from("direct:start") - .multicast().stopOnException() + .to("mock:a") + .multicast().stopOnException().parallelProcessing() .to("mock:foo", "mock:bar", "mock:baz"); } }); context.start(); + getMockEndpoint("mock:a").expectedMessageCount(1); getMockEndpoint("mock:foo").expectedMessageCount(1); getMockEndpoint("mock:bar").expectedMessageCount(1); getMockEndpoint("mock:baz").expectedMessageCount(1); @@ -53,15 +55,17 @@ onException(Exception.class).maximumRedeliveries(2); from("direct:start") - .multicast().stopOnException() + .to("mock:a") + .multicast().stopOnException().parallelProcessing() .to("mock:foo", "mock:bar").throwException(new IllegalArgumentException("Damn")).to("mock:baz"); } }); context.start(); + getMockEndpoint("mock:a").expectedMessageCount(1); getMockEndpoint("mock:foo").expectedMessageCount(1); getMockEndpoint("mock:bar").expectedMessageCount(1); - getMockEndpoint("mock:baz").expectedMessageCount(0); + getMockEndpoint("mock:baz").expectedMessageCount(1); try { template.sendBody("direct:start", "Hello World"); @@ -77,4 +81,4 @@ public boolean isUseRouteBuilder() { return false; } -} +} \ No newline at end of file Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java Mon Dec 14 09:25:50 2009 @@ -57,7 +57,7 @@ ExecutionException ee = assertIsInstanceOf(ExecutionException.class, e.getCause()); CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, ee.getCause()); assertTrue(cause.getMessage().startsWith("Parallel processing failed for number ")); - assertTrue(cause.getMessage().endsWith("on the exchange: Exchange[Message: Kaboom]")); + assertTrue(cause.getMessage().contains("on the exchange: Exchange[Message: Kaboom]")); assertEquals("Forced", cause.getCause().getMessage()); } @@ -96,4 +96,4 @@ } }; } -} \ No newline at end of file +} Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java Mon Dec 14 09:25:50 2009 @@ -51,7 +51,7 @@ fail("Should thrown an exception"); } catch (CamelExecutionException e) { CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause()); - assertEquals("Sequential processing failed for number 1 on the exchange: Exchange[Message: Kaboom]", cause.getMessage()); + assertTrue(cause.getMessage().startsWith("Sequential processing failed for number 1 on the exchange: Exchange[Message: Kaboom]")); assertEquals("Forced", cause.getCause().getMessage()); } @@ -87,4 +87,4 @@ } } } -} \ No newline at end of file +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java?rev=890241&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java Mon Dec 14 09:25:50 2009 @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; + +/** + * @version $Revision$ + */ +public class RecipientListFineGrainedErrorHandlingTest extends ContextTestSupport { + + private static int counter; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("fail", new MyFailBean()); + return jndi; + } + + public void testRecipientListOk() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class).maximumRedeliveries(2); + + from("direct:start") + .to("mock:a") + .recipientList(header("foo")).stopOnException(); + } + }); + context.start(); + + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "mock:foo,mock:bar,mock:baz"); + + assertMockEndpointsSatisfied(); + } + + public void testRecipientListError() throws Exception { + counter = 0; + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class).maximumRedeliveries(2); + + from("direct:start") + .to("mock:a") + .recipientList(header("foo")).stopOnException(); + } + }); + context.start(); + + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(0); + + try { + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "mock:foo,mock:bar,bean:fail,mock:baz"); + fail("Should throw exception"); + } catch (Exception e) { + // expected + } + + assertMockEndpointsSatisfied(); + + assertEquals(3, counter); + } + + public void testRecipientListAsBeanError() throws Exception { + counter = 0; + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setTracing(true); + + onException(Exception.class).maximumRedeliveries(2); + + from("direct:start") + .to("mock:a") + .bean(MyRecipientBean.class); + } + }); + context.start(); + + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(0); + + try { + template.sendBody("direct:start", "Hello World"); + fail("Should throw exception"); + } catch (CamelExecutionException e) { + // expected + assertIsInstanceOf(CamelExchangeException.class, e.getCause()); + assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause()); + assertEquals("Damn", e.getCause().getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + + assertEquals(3, counter); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public static class MyRecipientBean { + + @org.apache.camel.RecipientList(stopOnException = true) + public String sendSomewhere(Exchange exchange) { + return "mock:foo,mock:bar,bean:fail,mock:baz"; + } + } + + public static class MyFailBean { + + public String doSomething(Exchange exchange) throws Exception { + counter++; + assertEquals("bean://fail", exchange.getProperty(Exchange.TO_ENDPOINT, String.class)); + throw new IllegalArgumentException("Damn"); + } + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListFineGrainedErrorHandlingTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java?rev=890241&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java Mon Dec 14 09:25:50 2009 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.ExecutionException; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; + +/** + * @version $Revision$ + */ +public class RecipientListParallelFineGrainedErrorHandlingTest extends ContextTestSupport { + + private static int counter; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("fail", new MyFailBean()); + return jndi; + } + + public void testRecipientListOk() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class).maximumRedeliveries(2); + + from("direct:start") + .to("mock:a") + .recipientList(header("foo")).stopOnException().parallelProcessing(); + } + }); + context.start(); + + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "mock:foo,mock:bar,mock:baz"); + + assertMockEndpointsSatisfied(); + } + + public void testRecipientListError() throws Exception { + counter = 0; + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class).maximumRedeliveries(2); + + from("direct:start") + .to("mock:a") + .recipientList(header("foo")).stopOnException().parallelProcessing(); + } + }); + context.start(); + + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(1); + + try { + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "mock:foo,mock:bar,bean:fail,mock:baz"); + fail("Should throw exception"); + } catch (Exception e) { + // expected + } + + assertMockEndpointsSatisfied(); + + assertEquals(3, counter); + } + + public void testRecipientListAsBeanError() throws Exception { + counter = 0; + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setTracing(true); + + onException(Exception.class).maximumRedeliveries(2); + + from("direct:start") + .to("mock:a") + .bean(MyRecipientBean.class); + } + }); + context.start(); + + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(1); + + try { + template.sendBody("direct:start", "Hello World"); + fail("Should throw exception"); + } catch (CamelExecutionException e) { + // expected + assertIsInstanceOf(ExecutionException.class, e.getCause()); + assertIsInstanceOf(CamelExchangeException.class, e.getCause().getCause()); + assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause().getCause()); + assertEquals("Damn", e.getCause().getCause().getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + + assertEquals(3, counter); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public static class MyRecipientBean { + + @org.apache.camel.RecipientList(stopOnException = true, parallelProcessoing = true) + public String sendSomewhere(Exchange exchange) { + return "mock:foo,mock:bar,bean:fail,mock:baz"; + } + } + + public static class MyFailBean { + + public String doSomething(Exchange exchange) throws Exception { + counter++; + assertEquals("bean://fail", exchange.getProperty(Exchange.TO_ENDPOINT, String.class)); + throw new IllegalArgumentException("Damn"); + } + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java Mon Dec 14 09:25:50 2009 @@ -53,7 +53,7 @@ ExecutionException ee = assertIsInstanceOf(ExecutionException.class, e.getCause()); CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, ee.getCause()); assertTrue(cause.getMessage().startsWith("Parallel processing failed for number ")); - assertTrue(cause.getMessage().endsWith("on the exchange: Exchange[Message: Kaboom]")); + assertTrue(cause.getMessage().contains("on the exchange: Exchange[Message: Kaboom]")); assertEquals("Forced", cause.getCause().getMessage()); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java?rev=890241&r1=890240&r2=890241&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java Mon Dec 14 09:25:50 2009 @@ -48,7 +48,7 @@ fail("Should thrown an exception"); } catch (CamelExecutionException e) { CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause()); - assertEquals("Sequential processing failed for number 1 on the exchange: Exchange[Message: Kaboom]", cause.getMessage()); + assertTrue(cause.getMessage().startsWith("Sequential processing failed for number 1 on the exchange: Exchange[Message: Kaboom]")); assertEquals("Forced", cause.getCause().getMessage()); }