Repository: camel
Updated Branches:
  refs/heads/master f53890482 -> eeb09c827


CAMEL-8393: Fixed Routing Slip and Dynamic Router to not evaluate expression 
again during each redelivery attempt from Error Handler if routing caused an 
exception.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0163fe44
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0163fe44
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0163fe44

Branch: refs/heads/master
Commit: 0163fe44840c014f293e6790d6d60858191733be
Parents: f538904
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Sep 16 13:03:11 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Sep 16 13:03:11 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/processor/RoutingSlip.java |  76 +++++++++++-
 .../processor/DynamicRouterOnExceptionTest.java | 121 +++++++++++++++++++
 .../processor/RoutingSlipOnExceptionTest.java   |  38 ++++++
 3 files changed, 233 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0163fe44/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java 
b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index a8ca7e0..c20742c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -17,17 +17,21 @@
 package org.apache.camel.processor;
 
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.AsyncProducerCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.Traceable;
 import org.apache.camel.builder.ExpressionBuilder;
@@ -36,9 +40,12 @@ import org.apache.camel.impl.EmptyProducerCache;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -67,6 +74,20 @@ public class RoutingSlip extends ServiceSupport implements 
AsyncProcessor, Trace
     protected Expression expression;
     protected String uriDelimiter;
     protected final CamelContext camelContext;
+    private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> 
errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>();
+
+    /**
+     * Class that represents prepared fine grained error handlers when 
processing routingslip/dynamic-router exchanges
+     * <p/>
+     * This is similar to how multicast processor does.
+     */
+    static final class PreparedErrorHandler extends 
KeyValueHolder<RouteContext, Processor> {
+
+        public PreparedErrorHandler(RouteContext key, Processor value) {
+            super(key, value);
+        }
+
+    }
 
     /**
      * The iterator to be used for retrieving the next routing slip(s) to be 
used.
@@ -304,6 +325,49 @@ public class RoutingSlip extends ServiceSupport implements 
AsyncProcessor, Trace
         return copy;
     }
 
+    protected AsyncProcessor createErrorHandler(RouteContext routeContext, 
Exchange exchange, AsyncProcessor processor) {
+        AsyncProcessor answer = processor;
+
+        boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, 
false, boolean.class);
+
+        // do not wrap in error handler if we are inside a try block
+        if (!tryBlock && routeContext != null) {
+            // wrap the producer in error handler so we have fine grained 
error handling on
+            // the output side instead of the input side
+            // this is needed to support redelivery on that output alone and 
not doing redelivery
+            // for the entire routingslip/dynamic-router block again which 
will start from scratch again
+
+            // create key for cache
+            final PreparedErrorHandler key = new 
PreparedErrorHandler(routeContext, processor);
+
+            // lookup cached first to reuse and preserve memory
+            answer = errorHandlers.get(key);
+            if (answer != null) {
+                log.trace("Using existing error handler for: {}", processor);
+                return answer;
+            }
+
+            log.trace("Creating error handler for: {}", processor);
+            ErrorHandlerFactory builder = 
routeContext.getRoute().getErrorHandlerBuilder();
+            // create error handler (create error handler directly to keep it 
light weight,
+            // instead of using ProcessorDefinition.wrapInErrorHandler)
+            try {
+                answer = (AsyncProcessor) 
builder.createErrorHandler(routeContext, processor);
+
+                // must start the error handler
+                ServiceHelper.startServices(answer);
+
+                // add to cache
+                errorHandlers.putIfAbsent(key, answer);
+
+            } catch (Exception e) {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            }
+        }
+
+        return answer;
+    }
+
     protected boolean processExchange(final Endpoint endpoint, final Exchange 
exchange, final Exchange original,
                                       final AsyncCallback callback, final 
RoutingSlipIterator iter) {
 
@@ -313,6 +377,11 @@ public class RoutingSlip extends ServiceSupport implements 
AsyncProcessor, Trace
         boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, 
null, callback, new AsyncProducerCallback() {
             public boolean doInAsyncProducer(Producer producer, AsyncProcessor 
asyncProducer, final Exchange exchange,
                                              ExchangePattern exchangePattern, 
final AsyncCallback callback) {
+
+                // rework error handling to support fine grained error handling
+                RouteContext routeContext = exchange.getUnitOfWork() != null ? 
exchange.getUnitOfWork().getRouteContext() : null;
+                asyncProducer = createErrorHandler(routeContext, exchange, 
asyncProducer);
+
                 // set property which endpoint we send to
                 exchange.setProperty(Exchange.TO_ENDPOINT, 
endpoint.getEndpointUri());
                 exchange.setProperty(Exchange.SLIP_ENDPOINT, 
endpoint.getEndpointUri());
@@ -403,11 +472,14 @@ public class RoutingSlip extends ServiceSupport 
implements AsyncProcessor, Trace
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(producerCache);
+        ServiceHelper.stopServices(producerCache, errorHandlers);
     }
 
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownService(producerCache);
+        ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers);
+
+        // only clear error handlers when shutting down
+        errorHandlers.clear();
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {

http://git-wip-us.apache.org/repos/asf/camel/blob/0163fe44/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterOnExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterOnExceptionTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterOnExceptionTest.java
new file mode 100644
index 0000000..6684112
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterOnExceptionTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class DynamicRouterOnExceptionTest extends ContextTestSupport {
+
+    public void testOk() throws Exception {
+        getMockEndpoint("mock:end").expectedMessageCount(1);
+
+        MockEndpoint route = getMockEndpoint("mock:route");
+        route.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testException() throws Exception {
+        getMockEndpoint("mock:end").expectedMessageCount(1);
+
+        MockEndpoint route = getMockEndpoint("mock:route");
+        route.whenExchangeReceived(1, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.setException(new IllegalArgumentException("Forced"));
+            }
+        });
+        route.whenExchangeReceived(2, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Bye World");
+            }
+        });
+        route.expectedMessageCount(2);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testExceptionTwo() throws Exception {
+        getMockEndpoint("mock:end").expectedMessageCount(2);
+
+        MockEndpoint route = getMockEndpoint("mock:route");
+        route.whenExchangeReceived(1, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.setException(new IllegalArgumentException("Forced"));
+            }
+        });
+        route.whenExchangeReceived(2, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Bye World");
+            }
+        });
+        route.whenExchangeReceived(3, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.setException(new IllegalArgumentException("Forced"));
+            }
+        });
+        route.whenExchangeReceived(4, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Bye World");
+            }
+        });
+        route.expectedMessageCount(4);
+
+        template.sendBody("direct:start", "Hello World");
+        template.sendBody("direct:start", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(IllegalArgumentException.class)
+                    .maximumRedeliveries(5);
+
+                from("direct:start")
+                    .dynamicRouter(method(DynamicRouterOnExceptionTest.class, 
"whereTo"))
+                    .to("mock:end");
+            }
+        };
+    }
+
+    public static String whereTo(Exchange exchange) {
+        Boolean invoked = exchange.getProperty("invoked", Boolean.class);
+        if (invoked == null) {
+            exchange.setProperty("invoked", true);
+            return "mock:route";
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0163fe44/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipOnExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipOnExceptionTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipOnExceptionTest.java
new file mode 100644
index 0000000..9f5f6f2
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipOnExceptionTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.builder.RouteBuilder;
+
+public class RoutingSlipOnExceptionTest extends DynamicRouterOnExceptionTest {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(IllegalArgumentException.class)
+                    .maximumRedeliveries(5);
+
+                from("direct:start")
+                    .routingSlip(method(RoutingSlipOnExceptionTest.class, 
"whereTo"))
+                    .to("mock:end");
+            }
+        };
+    }
+
+}

Reply via email to