Repository: camel Updated Branches: refs/heads/camel-2.16.x f17b51d18 -> 7d1f61759 refs/heads/camel-2.17.x 900cf6947 -> df2a31a4b refs/heads/master ddb852cdf -> 7c4dd0b4f
CAMEL-10050: Routing slip no longer caches error handlers. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7c4dd0b4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7c4dd0b4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7c4dd0b4 Branch: refs/heads/master Commit: 7c4dd0b4f6ecd4840e4ccdbf1d7c28f2e8cb5691 Parents: ddb852c Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jun 12 14:21:46 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jun 12 14:21:46 2016 +0200 ---------------------------------------------------------------------- .../camel/processor/DefaultErrorHandler.java | 4 ++ .../org/apache/camel/processor/RoutingSlip.java | 65 +++++++++----------- .../util/AsyncProcessorConverterHelper.java | 10 ++- .../camel/issues/RoutingSlipMemoryLeakTest.java | 15 +---- 4 files changed, 44 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java index 7f94887..f6dc784 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java @@ -69,4 +69,8 @@ public class DefaultErrorHandler extends RedeliveryErrorHandler { return "DefaultErrorHandler[" + output + "]"; } + public Processor getDeadLetterProcessor() { + return deadLetter; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java index d2d46af..c081d46 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -17,8 +17,6 @@ package org.apache.camel.processor; import java.util.Iterator; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -44,7 +42,6 @@ import org.apache.camel.spi.RouteContext; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; @@ -73,20 +70,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace protected Expression expression; protected String uriDelimiter; protected final CamelContext camelContext; - private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>(); - - /** - * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges - * <p/> - * This is similar to how multicast processor does. - */ - static final class PreparedErrorHandler extends KeyValueHolder<String, Processor> { - - PreparedErrorHandler(String key, Processor value) { - super(key, value); - } - - } /** * The iterator to be used for retrieving the next routing slip(s) to be used. @@ -336,16 +319,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace // this is needed to support redelivery on that output alone and not doing redelivery // for the entire routingslip/dynamic-router block again which will start from scratch again - // create key for cache - final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor); - - // lookup cached first to reuse and preserve memory - answer = errorHandlers.get(key); - if (answer != null) { - log.trace("Using existing error handler for: {}", processor); - return answer; - } - log.trace("Creating error handler for: {}", processor); ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); // create error handler (create error handler directly to keep it light weight, @@ -356,9 +329,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace // must start the error handler ServiceHelper.startServices(answer); - // add to cache - errorHandlers.putIfAbsent(key, answer); - } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -379,13 +349,13 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace // rework error handling to support fine grained error handling RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; - asyncProducer = createErrorHandler(routeContext, exchange, asyncProducer, endpoint); + AsyncProcessor target = createErrorHandler(routeContext, exchange, asyncProducer, endpoint); // set property which endpoint we send to exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri()); - return asyncProducer.process(exchange, new AsyncCallback() { + boolean answer = target.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the routing slip if (doneSync) { @@ -445,9 +415,33 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace // copy results back to the original exchange ExchangeHelper.copyResults(original, current); + + if (target instanceof DeadLetterChannel) { + Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter(); + try { + ServiceHelper.stopService(deadLetter); + } catch (Exception e) { + log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e); + } + } + callback.done(false); } }); + + // stop error handler if we completed synchronously + if (answer) { + if (target instanceof DeadLetterChannel) { + Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter(); + try { + ServiceHelper.stopService(deadLetter); + } catch (Exception e) { + log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e); + } + } + } + + return answer; } }); @@ -471,14 +465,11 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace } protected void doStop() throws Exception { - ServiceHelper.stopServices(producerCache, errorHandlers); + ServiceHelper.stopServices(producerCache); } protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers); - - // only clear error handlers when shutting down - errorHandlers.clear(); + ServiceHelper.stopAndShutdownServices(producerCache); } public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java index 6b1862e..14319ed 100644 --- a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java @@ -123,13 +123,21 @@ public final class AsyncProcessorConverterHelper { return false; } + if (processor == null) { + return false; + } + ProcessorToAsyncProcessorBridge that = (ProcessorToAsyncProcessorBridge) o; return processor.equals(that.processor); } @Override public int hashCode() { - return processor.hashCode(); + if (processor != null) { + return processor.hashCode(); + } else { + return 0; + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java index 7ead2b3..7129ad5 100644 --- a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java +++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java @@ -16,9 +16,6 @@ */ package org.apache.camel.issues; -import java.lang.reflect.Field; -import java.util.Map; - import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.processor.RoutingSlip; @@ -40,8 +37,7 @@ public class RoutingSlipMemoryLeakTest extends ContextTestSupport { template.sendBody("direct:start", "message " + i); } RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class); - Map errorHandlers = getRoutingSlipErrorHandlers(routingSlip); - assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size()); + assertNotNull(routingSlip); } @Override @@ -49,19 +45,14 @@ public class RoutingSlipMemoryLeakTest extends ContextTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + from("direct:start") .routingSlip(method(SlipProvider.class)).id("memory-leak"); } }; } - private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception { - Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers"); - errorHandlersField.setAccessible(true); - Map errorHandlers = (Map) errorHandlersField.get(routingSlip); - return errorHandlers; - } - public static class SlipProvider { public String computeSlip(String body) {