Author: davsclaus
Date: Thu Dec 10 15:30:45 2009
New Revision: 889295

URL: http://svn.apache.org/viewvc?rev=889295&view=rev
Log:
CAMEL-2278: Multicast is now using fine grained error handling to avoid 
redelivery from scratch but does not on the particular failed processor instead.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=889295&r1=889294&r2=889295&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
 Thu Dec 10 15:30:45 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -26,9 +27,11 @@
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Processor;
+import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -126,7 +129,7 @@
         return this;
     }    
         
-    protected Processor createCompositeProcessor(RouteContext routeContext, 
List<Processor> list) {
+    protected Processor createCompositeProcessor(RouteContext routeContext, 
List<Processor> list) throws Exception {
         if (strategyRef != null) {
             aggregationStrategy = routeContext.lookup(strategyRef, 
AggregationStrategy.class);
         }
@@ -137,8 +140,16 @@
         if (executorServiceRef != null) {
             executorService = routeContext.lookup(executorServiceRef, 
ExecutorService.class);
         }
-        return new MulticastProcessor(list, aggregationStrategy, 
isParallelProcessing(), executorService,
-                isStreaming(), isStopOnException());
+
+        // wrap list of processors in error handlers so we have fine grained 
error handling
+        List<Processor> processors = new ArrayList<Processor>(list.size());
+        for (Processor output : list) {
+            Processor errorHandler = wrapInErrorHandler(routeContext, output);
+            processors.add(errorHandler);
+        }
+
+        return new MulticastProcessor(processors, aggregationStrategy, 
isParallelProcessing(), executorService,
+                                      isStreaming(), isStopOnException());
     }
 
     public AggregationStrategy getAggregationStrategy() {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=889295&r1=889294&r2=889295&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 Thu Dec 10 15:30:45 2009
@@ -180,25 +180,42 @@
         if (defn instanceof TryDefinition || defn instanceof CatchDefinition 
|| defn instanceof FinallyDefinition) {
             // do not use error handler for try .. catch .. finally blocks as 
it will handle errors itself
             return channel;
+        } else if (defn instanceof MulticastDefinition) {
+            // do not use error handler for multicast based as it offers fine 
grained error handlers for its outputs
+            return channel;
         } else {
             // regular definition so add the error handler
             Processor output = channel.getOutput();
-            // create error handler
-            ErrorHandlerBuilder builder = getErrorHandlerBuilder();
-            Processor errorHandler = builder.createErrorHandler(routeContext, 
output);
+            Processor errorHandler = wrapInErrorHandler(routeContext, output);
             // set error handler on channel
             channel.setErrorHandler(errorHandler);
 
-            // invoke lifecycles so we can manage this error handler builder
-            for (LifecycleStrategy strategy : 
routeContext.getCamelContext().getLifecycleStrategies()) {
-                strategy.onErrorHandlerAdd(routeContext, errorHandler, 
builder);
-            }
-
             return channel;
         }
     }
 
     /**
+     * Wraps the given output in an error handler
+     *
+     * @param routeContext the route context
+     * @param output the output
+     * @return the output wrapped with the error handler
+     * @throws Exception can be thrown
+     */
+    protected Processor wrapInErrorHandler(RouteContext routeContext, 
Processor output) throws Exception {
+        // create error handler
+        ErrorHandlerBuilder builder = getErrorHandlerBuilder();
+        Processor errorHandler = builder.createErrorHandler(routeContext, 
output);
+
+        // invoke lifecycles so we can manage this error handler builder
+        for (LifecycleStrategy strategy : 
routeContext.getCamelContext().getLifecycleStrategies()) {
+            strategy.onErrorHandlerAdd(routeContext, errorHandler, builder);
+        }
+
+        return errorHandler;
+    }
+
+    /**
      * Adds the given list of interceptors to the channel.
      *
      * @param routeContext  the route context
@@ -243,14 +260,14 @@
      * Creates a new instance of some kind of composite processor which 
defaults
      * to using a {...@link Pipeline} but derived classes could change the 
behaviour
      */
-    protected Processor createCompositeProcessor(RouteContext routeContext, 
List<Processor> list) {
+    protected Processor createCompositeProcessor(RouteContext routeContext, 
List<Processor> list) throws Exception {
         return new Pipeline(list);
     }
 
     /**
      * Creates a new instance of the {...@link Channel}.
      */
-    protected Channel createChannel(RouteContext routeContext) {
+    protected Channel createChannel(RouteContext routeContext) throws 
Exception {
         return new DefaultChannel();
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java?rev=889295&r1=889294&r2=889295&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
 Thu Dec 10 15:30:45 2009
@@ -104,4 +104,9 @@
      */
     public abstract boolean supportTransacted();
 
+    /**
+     * Gets the output
+     */
+    public abstract Processor getOutput();
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java?rev=889295&r1=889294&r2=889295&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java 
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java Thu 
Dec 10 15:30:45 2009
@@ -27,6 +27,7 @@
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.ErrorHandlerSupport;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.PredicateAssertHelper;
 import org.apache.commons.logging.Log;
@@ -377,6 +378,8 @@
                 return (Channel) processor;
             } else if (processor instanceof DelegateProcessor) {
                 processor = ((DelegateProcessor)processor).getProcessor();
+            } else if (processor instanceof ErrorHandlerSupport) {
+                processor = ((ErrorHandlerSupport)processor).getOutput();
             } else {
                 return null;
             }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java?rev=889295&r1=889294&r2=889295&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ErrorHandlerSupportTest.java
 Thu Dec 10 15:30:45 2009
@@ -22,6 +22,7 @@
 import junit.framework.TestCase;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.model.OnExceptionDefinition;
 
 public class ErrorHandlerSupportTest extends TestCase {
@@ -91,6 +92,10 @@
             return false;
         }
 
+        public Processor getOutput() {
+            return null;
+        }
+
         public void process(Exchange exchange) throws Exception {
         }
     }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java?rev=889295&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
 Thu Dec 10 15:30:45 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class MulticastFineGrainedErrorHandlingTest extends ContextTestSupport {
+
+    public void testMulticastOk() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class).maximumRedeliveries(2);
+
+                from("direct:start")
+                    .multicast().stopOnException()
+                    .to("mock:foo", "mock:bar", "mock:baz");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMulticastError() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class).maximumRedeliveries(2);
+
+                from("direct:start")
+                    .multicast().stopOnException()
+                    .to("mock:foo", "mock:bar").throwException(new 
IllegalArgumentException("Damn")).to("mock:baz");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should throw exception");
+        } catch (Exception e) {
+            // expected
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastFineGrainedErrorHandlingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to