Author: davsclaus Date: Thu Dec 10 15:30:45 2009 New Revision: 889295 URL: http://svn.apache.org/viewvc?rev=889295&view=rev Log: CAMEL-2278: Multicast is now using fine grained error handling to avoid redelivery from scratch but does not on the particular failed processor instead.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=889295&r1=889294&r2=889295&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Thu Dec 10 15:30:45 2009 @@ -16,6 +16,7 @@ */ package org.apache.camel.model; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -26,9 +27,11 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Processor; +import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; +import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.RouteContext; /** @@ -126,7 +129,7 @@ return this; } - protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) { + protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { if (strategyRef != null) { aggregationStrategy = routeContext.lookup(strategyRef, AggregationStrategy.class); } @@ -137,8 +140,16 @@ if (executorServiceRef != null) { executorService = routeContext.lookup(executorServiceRef, ExecutorService.class); } - return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executorService, - isStreaming(), isStopOnException()); + + // wrap list of processors in error handlers so we have fine grained error handling + List<Processor> processors = new ArrayList<Processor>(list.size()); + for (Processor output : list) { + Processor errorHandler = wrapInErrorHandler(routeContext, output); + processors.add(errorHandler); + } + + return new MulticastProcessor(processors, aggregationStrategy, isParallelProcessing(), executorService, + isStreaming(), isStopOnException()); } public AggregationStrategy getAggregationStrategy() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=889295&r1=889294&r2=889295&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Thu Dec 10 15:30:45 2009 @@ -180,25 +180,42 @@ if (defn instanceof TryDefinition || defn instanceof CatchDefinition || defn instanceof FinallyDefinition) { // do not use error handler for try .. catch .. finally blocks as it will handle errors itself return channel; + } else if (defn instanceof MulticastDefinition) { + // do not use error handler for multicast based as it offers fine grained error handlers for its outputs + return channel; } else { // regular definition so add the error handler Processor output = channel.getOutput(); - // create error handler - ErrorHandlerBuilder builder = getErrorHandlerBuilder(); - Processor errorHandler = builder.createErrorHandler(routeContext, output); + Processor errorHandler = wrapInErrorHandler(routeContext, output); // set error handler on channel channel.setErrorHandler(errorHandler); - // invoke lifecycles so we can manage this error handler builder - for (LifecycleStrategy strategy : routeContext.getCamelContext().getLifecycleStrategies()) { - strategy.onErrorHandlerAdd(routeContext, errorHandler, builder); - } - return channel; } } /** + * Wraps the given output in an error handler + * + * @param routeContext the route context + * @param output the output + * @return the output wrapped with the error handler + * @throws Exception can be thrown + */ + protected Processor wrapInErrorHandler(RouteContext routeContext, Processor output) throws Exception { + // create error handler + ErrorHandlerBuilder builder = getErrorHandlerBuilder(); + Processor errorHandler = builder.createErrorHandler(routeContext, output); + + // invoke lifecycles so we can manage this error handler builder + for (LifecycleStrategy strategy : routeContext.getCamelContext().getLifecycleStrategies()) { + strategy.onErrorHandlerAdd(routeContext, errorHandler, builder); + } + + return errorHandler; + } + + /** * Adds the given list of interceptors to the channel. * * @param routeContext the route context @@ -243,14 +260,14 @@ * Creates a new instance of some kind of composite processor which defaults * to using a {...@link Pipeline} but derived classes could change the behaviour */ - protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) { + protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { return new Pipeline(list); } /** * Creates a new instance of the {...@link Channel}. */ - protected Channel createChannel(RouteContext routeContext) { + protected Channel createChannel(RouteContext routeContext) throws Exception { return new DefaultChannel(); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java?rev=889295&r1=889294&r2=889295&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java Thu Dec 10 15:30:45 2009 @@ -104,4 +104,9 @@ */ public abstract boolean supportTransacted(); + /** + * Gets the output + */ + public abstract Processor getOutput(); + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java?rev=889295&r1=889294&r2=889295&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java Thu Dec 10 15:30:45 2009 @@ -27,6 +27,7 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.processor.DelegateProcessor; +import org.apache.camel.processor.ErrorHandlerSupport; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.PredicateAssertHelper; import org.apache.commons.logging.Log; @@ -377,6 +378,8 @@ return (Channel) processor; } else if (processor instanceof DelegateProcessor) { processor = ((DelegateProcessor)processor).getProcessor(); + } else if (processor instanceof ErrorHandlerSupport) { + processor = ((ErrorHandlerSupport)processor).getOutput(); } else { return null; } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java?rev=889295&r1=889294&r2=889295&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java Thu Dec 10 15:30:45 2009 @@ -22,6 +22,7 @@ import junit.framework.TestCase; import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.model.OnExceptionDefinition; public class ErrorHandlerSupportTest extends TestCase { @@ -91,6 +92,10 @@ return false; } + public Processor getOutput() { + return null; + } + public void process(Exchange exchange) throws Exception { } } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java?rev=889295&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java Thu Dec 10 15:30:45 2009 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class MulticastFineGrainedErrorHandlingTest extends ContextTestSupport { + + public void testMulticastOk() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class).maximumRedeliveries(2); + + from("direct:start") + .multicast().stopOnException() + .to("mock:foo", "mock:bar", "mock:baz"); + } + }); + context.start(); + + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testMulticastError() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class).maximumRedeliveries(2); + + from("direct:start") + .multicast().stopOnException() + .to("mock:foo", "mock:bar").throwException(new IllegalArgumentException("Damn")).to("mock:baz"); + } + }); + context.start(); + + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(0); + + try { + template.sendBody("direct:start", "Hello World"); + fail("Should throw exception"); + } catch (Exception e) { + // expected + } + + assertMockEndpointsSatisfied(); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date