Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x f17b51d18 -> 7d1f61759
  refs/heads/camel-2.17.x 900cf6947 -> df2a31a4b
  refs/heads/master ddb852cdf -> 7c4dd0b4f


CAMEL-10050: Routing slip no longer caches error handlers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7c4dd0b4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7c4dd0b4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7c4dd0b4

Branch: refs/heads/master
Commit: 7c4dd0b4f6ecd4840e4ccdbf1d7c28f2e8cb5691
Parents: ddb852c
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Jun 12 14:21:46 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Jun 12 14:21:46 2016 +0200

----------------------------------------------------------------------
 .../camel/processor/DefaultErrorHandler.java    |  4 ++
 .../org/apache/camel/processor/RoutingSlip.java | 65 +++++++++-----------
 .../util/AsyncProcessorConverterHelper.java     | 10 ++-
 .../camel/issues/RoutingSlipMemoryLeakTest.java | 15 +----
 4 files changed, 44 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java 
b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
index 7f94887..f6dc784 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
@@ -69,4 +69,8 @@ public class DefaultErrorHandler extends 
RedeliveryErrorHandler {
         return "DefaultErrorHandler[" + output + "]";
     }
 
+    public Processor getDeadLetterProcessor() {
+        return deadLetter;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java 
b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index d2d46af..c081d46 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -17,8 +17,6 @@
 package org.apache.camel.processor;
 
 import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -44,7 +42,6 @@ import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -73,20 +70,6 @@ public class RoutingSlip extends ServiceSupport implements 
AsyncProcessor, Trace
     protected Expression expression;
     protected String uriDelimiter;
     protected final CamelContext camelContext;
-    private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> 
errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>();
-
-    /**
-     * Class that represents prepared fine grained error handlers when 
processing routingslip/dynamic-router exchanges
-     * <p/>
-     * This is similar to how multicast processor does.
-     */
-    static final class PreparedErrorHandler extends KeyValueHolder<String, 
Processor> {
-
-        PreparedErrorHandler(String key, Processor value) {
-            super(key, value);
-        }
-
-    }
 
     /**
      * The iterator to be used for retrieving the next routing slip(s) to be 
used.
@@ -336,16 +319,6 @@ public class RoutingSlip extends ServiceSupport implements 
AsyncProcessor, Trace
             // this is needed to support redelivery on that output alone and 
not doing redelivery
             // for the entire routingslip/dynamic-router block again which 
will start from scratch again
 
-            // create key for cache
-            final PreparedErrorHandler key = new 
PreparedErrorHandler(endpoint.getEndpointUri(), processor);
-
-            // lookup cached first to reuse and preserve memory
-            answer = errorHandlers.get(key);
-            if (answer != null) {
-                log.trace("Using existing error handler for: {}", processor);
-                return answer;
-            }
-
             log.trace("Creating error handler for: {}", processor);
             ErrorHandlerFactory builder = 
routeContext.getRoute().getErrorHandlerBuilder();
             // create error handler (create error handler directly to keep it 
light weight,
@@ -356,9 +329,6 @@ public class RoutingSlip extends ServiceSupport implements 
AsyncProcessor, Trace
                 // must start the error handler
                 ServiceHelper.startServices(answer);
 
-                // add to cache
-                errorHandlers.putIfAbsent(key, answer);
-
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             }
@@ -379,13 +349,13 @@ public class RoutingSlip extends ServiceSupport 
implements AsyncProcessor, Trace
 
                 // rework error handling to support fine grained error handling
                 RouteContext routeContext = exchange.getUnitOfWork() != null ? 
exchange.getUnitOfWork().getRouteContext() : null;
-                asyncProducer = createErrorHandler(routeContext, exchange, 
asyncProducer, endpoint);
+                AsyncProcessor target = createErrorHandler(routeContext, 
exchange, asyncProducer, endpoint);
 
                 // set property which endpoint we send to
                 exchange.setProperty(Exchange.TO_ENDPOINT, 
endpoint.getEndpointUri());
                 exchange.setProperty(Exchange.SLIP_ENDPOINT, 
endpoint.getEndpointUri());
 
-                return asyncProducer.process(exchange, new AsyncCallback() {
+                boolean answer = target.process(exchange, new AsyncCallback() {
                     public void done(boolean doneSync) {
                         // we only have to handle async completion of the 
routing slip
                         if (doneSync) {
@@ -445,9 +415,33 @@ public class RoutingSlip extends ServiceSupport implements 
AsyncProcessor, Trace
 
                         // copy results back to the original exchange
                         ExchangeHelper.copyResults(original, current);
+
+                        if (target instanceof DeadLetterChannel) {
+                            Processor deadLetter = ((DeadLetterChannel) 
target).getDeadLetter();
+                            try {
+                                ServiceHelper.stopService(deadLetter);
+                            } catch (Exception e) {
+                                log.warn("Error stopping DeadLetterChannel 
error handler on routing slip. This exception is ignored.", e);
+                            }
+                        }
+
                         callback.done(false);
                     }
                 });
+
+                // stop error handler if we completed synchronously
+                if (answer) {
+                    if (target instanceof DeadLetterChannel) {
+                        Processor deadLetter = ((DeadLetterChannel) 
target).getDeadLetter();
+                        try {
+                            ServiceHelper.stopService(deadLetter);
+                        } catch (Exception e) {
+                            log.warn("Error stopping DeadLetterChannel error 
handler on routing slip. This exception is ignored.", e);
+                        }
+                    }
+                }
+
+                return answer;
             }
         });
 
@@ -471,14 +465,11 @@ public class RoutingSlip extends ServiceSupport 
implements AsyncProcessor, Trace
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(producerCache, errorHandlers);
+        ServiceHelper.stopServices(producerCache);
     }
 
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers);
-
-        // only clear error handlers when shutting down
-        errorHandlers.clear();
+        ServiceHelper.stopAndShutdownServices(producerCache);
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {

http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
 
b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
index 6b1862e..14319ed 100644
--- 
a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
+++ 
b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
@@ -123,13 +123,21 @@ public final class AsyncProcessorConverterHelper {
                 return false;
             }
 
+            if (processor == null) {
+                return false;
+            }
+
             ProcessorToAsyncProcessorBridge that = 
(ProcessorToAsyncProcessorBridge) o;
             return processor.equals(that.processor);
         }
 
         @Override
         public int hashCode() {
-            return processor.hashCode();
+            if (processor != null) {
+                return processor.hashCode();
+            } else {
+                return 0;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
 
b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
index 7ead2b3..7129ad5 100644
--- 
a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.issues;
 
-import java.lang.reflect.Field;
-import java.util.Map;
-
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.RoutingSlip;
@@ -40,8 +37,7 @@ public class RoutingSlipMemoryLeakTest extends 
ContextTestSupport {
             template.sendBody("direct:start", "message " + i);
         }
         RoutingSlip routingSlip = context.getProcessor("memory-leak", 
RoutingSlip.class);
-        Map errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
-        assertEquals("Error handlers cache must contain only one value", 1, 
errorHandlers.size());
+        assertNotNull(routingSlip);
     }
 
     @Override
@@ -49,19 +45,14 @@ public class RoutingSlipMemoryLeakTest extends 
ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
                 from("direct:start")
                     .routingSlip(method(SlipProvider.class)).id("memory-leak");
             }
         };
     }
 
-    private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) 
throws Exception {
-        Field errorHandlersField = 
routingSlip.getClass().getDeclaredField("errorHandlers");
-        errorHandlersField.setAccessible(true);
-        Map errorHandlers = (Map) errorHandlersField.get(routingSlip);
-        return errorHandlers;
-    }
-
     public static class SlipProvider {
 
         public String computeSlip(String body) {

Reply via email to