Repository: camel Updated Branches: refs/heads/master f53890482 -> eeb09c827
CAMEL-8393: Fixed Routing Slip and Dynamic Router to not evaluate expression again during each redelivery attempt from Error Handler if routing caused an exception. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0163fe44 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0163fe44 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0163fe44 Branch: refs/heads/master Commit: 0163fe44840c014f293e6790d6d60858191733be Parents: f538904 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Sep 16 13:03:11 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Sep 16 13:03:11 2015 +0200 ---------------------------------------------------------------------- .../org/apache/camel/processor/RoutingSlip.java | 76 +++++++++++- .../processor/DynamicRouterOnExceptionTest.java | 121 +++++++++++++++++++ .../processor/RoutingSlipOnExceptionTest.java | 38 ++++++ 3 files changed, 233 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0163fe44/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java index a8ca7e0..c20742c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -17,17 +17,21 @@ package org.apache.camel.processor; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.AsyncProducerCallback; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.Message; +import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.Traceable; import org.apache.camel.builder.ExpressionBuilder; @@ -36,9 +40,12 @@ import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; @@ -67,6 +74,20 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace protected Expression expression; protected String uriDelimiter; protected final CamelContext camelContext; + private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>(); + + /** + * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges + * <p/> + * This is similar to how multicast processor does. + */ + static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> { + + public PreparedErrorHandler(RouteContext key, Processor value) { + super(key, value); + } + + } /** * The iterator to be used for retrieving the next routing slip(s) to be used. @@ -304,6 +325,49 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace return copy; } + protected AsyncProcessor createErrorHandler(RouteContext routeContext, Exchange exchange, AsyncProcessor processor) { + AsyncProcessor answer = processor; + + boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); + + // do not wrap in error handler if we are inside a try block + if (!tryBlock && routeContext != null) { + // wrap the producer in error handler so we have fine grained error handling on + // the output side instead of the input side + // this is needed to support redelivery on that output alone and not doing redelivery + // for the entire routingslip/dynamic-router block again which will start from scratch again + + // create key for cache + final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); + + // lookup cached first to reuse and preserve memory + answer = errorHandlers.get(key); + if (answer != null) { + log.trace("Using existing error handler for: {}", processor); + return answer; + } + + log.trace("Creating error handler for: {}", processor); + ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); + // create error handler (create error handler directly to keep it light weight, + // instead of using ProcessorDefinition.wrapInErrorHandler) + try { + answer = (AsyncProcessor) builder.createErrorHandler(routeContext, processor); + + // must start the error handler + ServiceHelper.startServices(answer); + + // add to cache + errorHandlers.putIfAbsent(key, answer); + + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + return answer; + } + protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original, final AsyncCallback callback, final RoutingSlipIterator iter) { @@ -313,6 +377,11 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() { public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, ExchangePattern exchangePattern, final AsyncCallback callback) { + + // rework error handling to support fine grained error handling + RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; + asyncProducer = createErrorHandler(routeContext, exchange, asyncProducer); + // set property which endpoint we send to exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri()); @@ -403,11 +472,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace } protected void doStop() throws Exception { - ServiceHelper.stopService(producerCache); + ServiceHelper.stopServices(producerCache, errorHandlers); } protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownService(producerCache); + ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers); + + // only clear error handlers when shutting down + errorHandlers.clear(); } public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { http://git-wip-us.apache.org/repos/asf/camel/blob/0163fe44/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterOnExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterOnExceptionTest.java new file mode 100644 index 0000000..6684112 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterOnExceptionTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +public class DynamicRouterOnExceptionTest extends ContextTestSupport { + + public void testOk() throws Exception { + getMockEndpoint("mock:end").expectedMessageCount(1); + + MockEndpoint route = getMockEndpoint("mock:route"); + route.expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testException() throws Exception { + getMockEndpoint("mock:end").expectedMessageCount(1); + + MockEndpoint route = getMockEndpoint("mock:route"); + route.whenExchangeReceived(1, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new IllegalArgumentException("Forced")); + } + }); + route.whenExchangeReceived(2, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Bye World"); + } + }); + route.expectedMessageCount(2); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testExceptionTwo() throws Exception { + getMockEndpoint("mock:end").expectedMessageCount(2); + + MockEndpoint route = getMockEndpoint("mock:route"); + route.whenExchangeReceived(1, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new IllegalArgumentException("Forced")); + } + }); + route.whenExchangeReceived(2, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Bye World"); + } + }); + route.whenExchangeReceived(3, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new IllegalArgumentException("Forced")); + } + }); + route.whenExchangeReceived(4, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Bye World"); + } + }); + route.expectedMessageCount(4); + + template.sendBody("direct:start", "Hello World"); + template.sendBody("direct:start", "Bye World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(IllegalArgumentException.class) + .maximumRedeliveries(5); + + from("direct:start") + .dynamicRouter(method(DynamicRouterOnExceptionTest.class, "whereTo")) + .to("mock:end"); + } + }; + } + + public static String whereTo(Exchange exchange) { + Boolean invoked = exchange.getProperty("invoked", Boolean.class); + if (invoked == null) { + exchange.setProperty("invoked", true); + return "mock:route"; + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0163fe44/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipOnExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipOnExceptionTest.java new file mode 100644 index 0000000..9f5f6f2 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipOnExceptionTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.builder.RouteBuilder; + +public class RoutingSlipOnExceptionTest extends DynamicRouterOnExceptionTest { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(IllegalArgumentException.class) + .maximumRedeliveries(5); + + from("direct:start") + .routingSlip(method(RoutingSlipOnExceptionTest.class, "whereTo")) + .to("mock:end"); + } + }; + } + +}