Author: davsclaus
Date: Thu Aug  5 15:10:27 2010
New Revision: 982655

URL: http://svn.apache.org/viewvc?rev=982655&view=rev
Log:
CAMEL-3029: Routing engine should stop continue routing Exchange if the thread 
was interrupted by the JDK (such as future task cancelled etc.)

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=982655&r1=982654&r2=982655&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
 Thu Aug  5 15:10:27 2010
@@ -117,7 +117,11 @@ public abstract class DelayProcessorSupp
                         exchange.setException(new 
RejectedExecutionException());
                     } else {
                         // let caller run by processing
-                        delay(delay, exchange);
+                        try {
+                            delay(delay, exchange);
+                        } catch (InterruptedException ie) {
+                            exchange.setException(ie);
+                        }
                         // then continue routing
                         return super.process(exchange, callback);
                     }
@@ -157,7 +161,7 @@ public abstract class DelayProcessorSupp
      * @param delay the delay time in millis
      * @param exchange the exchange being processed
      */
-    protected void delay(long delay, Exchange exchange) {
+    protected void delay(long delay, Exchange exchange) throws 
InterruptedException {
         // only run is we are started
         if (!isRunAllowed()) {
             return;
@@ -169,19 +173,20 @@ public abstract class DelayProcessorSupp
             try {
                 sleep(delay);
             } catch (InterruptedException e) {
-                handleSleepInterruptedException(e);
+                handleSleepInterruptedException(e, exchange);
             }
         }
     }
 
     /**
-     * Called when a sleep is interrupted; allows derived classes to handle 
this
-     * case differently
+     * Called when a sleep is interrupted; allows derived classes to handle 
this case differently
      */
-    protected void handleSleepInterruptedException(InterruptedException e) {
+    protected void handleSleepInterruptedException(InterruptedException e, 
Exchange exchange) throws InterruptedException {
         if (log.isDebugEnabled()) {
             log.debug("Sleep interrupted, are we stopping? " + (isStopping() 
|| isStopped()));
         }
+        Thread.currentThread().interrupt();
+        throw e;
     }
 
     protected long currentSystemTime() {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=982655&r1=982654&r2=982655&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Thu Aug  5 15:10:27 2010
@@ -261,6 +261,7 @@ public class MulticastProcessor extends 
 
         // its to hard to do parallel async routing so we let the caller 
thread be synchronously
         // and have it pickup the replies and do the aggregation
+        // TODO: use a stopwatch to keep track of timeout left
         boolean timedOut = false;
         for (int i = 0; i < total.intValue(); i++) {
             Future<Exchange> future;

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=982655&r1=982654&r2=982655&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java 
Thu Aug  5 15:10:27 2010
@@ -236,7 +236,7 @@ public class Pipeline extends MulticastP
             boolean doStop = 
exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
             if (doStop) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Exchange is marked to stop routing: " + 
exchange);
+                    LOG.debug("ExchangeId: " + exchange.getExchangeId() + " is 
marked to stop routing: " + exchange);
                 }
                 answer = false;
             }
@@ -246,7 +246,7 @@ public class Pipeline extends MulticastP
         }
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Continue routing: " + answer);
+            LOG.trace("ExchangeId: " + exchange.getExchangeId() + " should 
continue routing: " + answer);
         }
         return answer;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=982655&r1=982654&r2=982655&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 Thu Aug  5 15:10:27 2010
@@ -270,6 +270,9 @@ public abstract class RedeliveryErrorHan
                         } catch (InterruptedException e) {
                             // we was interrupted so break out
                             exchange.setException(e);
+                            // mark the exchange to stop continue routing when 
interrupted
+                            // as we do not want to continue routing (for 
example a task has been cancelled)
+                            exchange.setProperty(Exchange.ROUTE_STOP, 
Boolean.TRUE);
                             callback.done(data.sync);
                             return data.sync;
                         }
@@ -396,12 +399,16 @@ public abstract class RedeliveryErrorHan
      * Strategy to determine if the exchange is done so we can continue
      */
     protected boolean isDone(Exchange exchange) {
+        boolean answer = isCancelledOrInterrupted(exchange);
+
         // only done if the exchange hasn't failed
         // and it has not been handled by the failure processor
         // or we are exhausted
-        boolean answer = exchange.getException() == null
+        if (!answer) {
+            answer = exchange.getException() == null
                 || ExchangeHelper.isFailureHandled(exchange)
                 || ExchangeHelper.isRedeliveryExhausted(exchange);
+        }
 
         if (log.isTraceEnabled()) {
             log.trace("Is exchangeId: " + exchange.getExchangeId() + " done? " 
+ answer);
@@ -410,6 +417,25 @@ public abstract class RedeliveryErrorHan
     }
 
     /**
+     * Strategy to determine if the exchange was cancelled or interrupted
+     */
+    protected boolean isCancelledOrInterrupted(Exchange exchange) {
+        boolean answer = false;
+
+        if (ExchangeHelper.isInterrupted(exchange)) {
+            // mark the exchange to stop continue routing when interrupted
+            // as we do not want to continue routing (for example a task has 
been cancelled)
+            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
+            answer = true;
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("Is exchangeId: " + exchange.getExchangeId() + " 
interrupted? " + answer);
+        }
+        return answer;
+    }
+
+    /**
      * Returns the output processor
      */
     public Processor getOutput() {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=982655&r1=982654&r2=982655&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
Thu Aug  5 15:10:27 2010
@@ -433,6 +433,10 @@ public final class ExchangeHelper {
         return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, 
Boolean.class);
     }
 
+    public static boolean isInterrupted(Exchange exchange) {
+        return exchange.getException(InterruptedException.class) != null;
+    }
+
     /**
      * Extracts the body from the given exchange.
      * <p/>

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java?rev=982655&r1=982654&r2=982655&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java
 Thu Aug  5 15:10:27 2010
@@ -32,8 +32,16 @@ public class MulticastParallelTimeoutTes
         // A will timeout so we only get B and C
         mock.expectedBodiesReceived("BC");
 
+        getMockEndpoint("mock:A").expectedMessageCount(0);
+        getMockEndpoint("mock:B").expectedMessageCount(1);
+        getMockEndpoint("mock:C").expectedMessageCount(1);
+
         template.sendBody("direct:start", "Hello");
 
+        // wait at least longer than the delay in A so we can ensure its being 
cancelled
+        // and wont continue routing
+        Thread.sleep(4000);
+
         assertMockEndpointsSatisfied();
     }
 
@@ -60,11 +68,11 @@ public class MulticastParallelTimeoutTes
                     .end()
                     .to("mock:result");
 
-                from("direct:a").delay(3000).setBody(constant("A"));
+                
from("direct:a").delay(3000).to("mock:A").setBody(constant("A"));
 
-                from("direct:b").setBody(constant("B"));
+                from("direct:b").to("mock:B").setBody(constant("B"));
 
-                from("direct:c").delay(500).setBody(constant("C"));
+                
from("direct:c").delay(500).to("mock:C").setBody(constant("C"));
                 // END SNIPPET: e1
             }
         };

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=982655&r1=982654&r2=982655&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Thu Aug  5 
15:10:27 2010
@@ -41,7 +41,10 @@ log4j.logger.org.apache.camel.management
 log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 #log4j.logger.org.apache.camel.impl=TRACE
 #log4j.logger.org.apache.camel.util.FileUtil=TRACE
+#log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE
 #log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
+#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE
+#log4j.logger.org.apache.camel.processor.Pipeline=TRACE
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender


Reply via email to