Author: davsclaus Date: Thu Aug 5 15:10:27 2010 New Revision: 982655 URL: http://svn.apache.org/viewvc?rev=982655&view=rev Log: CAMEL-3029: Routing engine should stop continue routing Exchange if the thread was interrupted by the JDK (such as future task cancelled etc.)
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java camel/trunk/camel-core/src/test/resources/log4j.properties Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=982655&r1=982654&r2=982655&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Thu Aug 5 15:10:27 2010 @@ -117,7 +117,11 @@ public abstract class DelayProcessorSupp exchange.setException(new RejectedExecutionException()); } else { // let caller run by processing - delay(delay, exchange); + try { + delay(delay, exchange); + } catch (InterruptedException ie) { + exchange.setException(ie); + } // then continue routing return super.process(exchange, callback); } @@ -157,7 +161,7 @@ public abstract class DelayProcessorSupp * @param delay the delay time in millis * @param exchange the exchange being processed */ - protected void delay(long delay, Exchange exchange) { + protected void delay(long delay, Exchange exchange) throws InterruptedException { // only run is we are started if (!isRunAllowed()) { return; @@ -169,19 +173,20 @@ public abstract class DelayProcessorSupp try { sleep(delay); } catch (InterruptedException e) { - handleSleepInterruptedException(e); + handleSleepInterruptedException(e, exchange); } } } /** - * Called when a sleep is interrupted; allows derived classes to handle this - * case differently + * Called when a sleep is interrupted; allows derived classes to handle this case differently */ - protected void handleSleepInterruptedException(InterruptedException e) { + protected void handleSleepInterruptedException(InterruptedException e, Exchange exchange) throws InterruptedException { if (log.isDebugEnabled()) { log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); } + Thread.currentThread().interrupt(); + throw e; } protected long currentSystemTime() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=982655&r1=982654&r2=982655&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Thu Aug 5 15:10:27 2010 @@ -261,6 +261,7 @@ public class MulticastProcessor extends // its to hard to do parallel async routing so we let the caller thread be synchronously // and have it pickup the replies and do the aggregation + // TODO: use a stopwatch to keep track of timeout left boolean timedOut = false; for (int i = 0; i < total.intValue(); i++) { Future<Exchange> future; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=982655&r1=982654&r2=982655&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Thu Aug 5 15:10:27 2010 @@ -236,7 +236,7 @@ public class Pipeline extends MulticastP boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); if (doStop) { if (LOG.isDebugEnabled()) { - LOG.debug("Exchange is marked to stop routing: " + exchange); + LOG.debug("ExchangeId: " + exchange.getExchangeId() + " is marked to stop routing: " + exchange); } answer = false; } @@ -246,7 +246,7 @@ public class Pipeline extends MulticastP } if (LOG.isTraceEnabled()) { - LOG.trace("Continue routing: " + answer); + LOG.trace("ExchangeId: " + exchange.getExchangeId() + " should continue routing: " + answer); } return answer; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=982655&r1=982654&r2=982655&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Thu Aug 5 15:10:27 2010 @@ -270,6 +270,9 @@ public abstract class RedeliveryErrorHan } catch (InterruptedException e) { // we was interrupted so break out exchange.setException(e); + // mark the exchange to stop continue routing when interrupted + // as we do not want to continue routing (for example a task has been cancelled) + exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); callback.done(data.sync); return data.sync; } @@ -396,12 +399,16 @@ public abstract class RedeliveryErrorHan * Strategy to determine if the exchange is done so we can continue */ protected boolean isDone(Exchange exchange) { + boolean answer = isCancelledOrInterrupted(exchange); + // only done if the exchange hasn't failed // and it has not been handled by the failure processor // or we are exhausted - boolean answer = exchange.getException() == null + if (!answer) { + answer = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange) || ExchangeHelper.isRedeliveryExhausted(exchange); + } if (log.isTraceEnabled()) { log.trace("Is exchangeId: " + exchange.getExchangeId() + " done? " + answer); @@ -410,6 +417,25 @@ public abstract class RedeliveryErrorHan } /** + * Strategy to determine if the exchange was cancelled or interrupted + */ + protected boolean isCancelledOrInterrupted(Exchange exchange) { + boolean answer = false; + + if (ExchangeHelper.isInterrupted(exchange)) { + // mark the exchange to stop continue routing when interrupted + // as we do not want to continue routing (for example a task has been cancelled) + exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); + answer = true; + } + + if (log.isTraceEnabled()) { + log.trace("Is exchangeId: " + exchange.getExchangeId() + " interrupted? " + answer); + } + return answer; + } + + /** * Returns the output processor */ public Processor getOutput() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=982655&r1=982654&r2=982655&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Thu Aug 5 15:10:27 2010 @@ -433,6 +433,10 @@ public final class ExchangeHelper { return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); } + public static boolean isInterrupted(Exchange exchange) { + return exchange.getException(InterruptedException.class) != null; + } + /** * Extracts the body from the given exchange. * <p/> Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java?rev=982655&r1=982654&r2=982655&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java Thu Aug 5 15:10:27 2010 @@ -32,8 +32,16 @@ public class MulticastParallelTimeoutTes // A will timeout so we only get B and C mock.expectedBodiesReceived("BC"); + getMockEndpoint("mock:A").expectedMessageCount(0); + getMockEndpoint("mock:B").expectedMessageCount(1); + getMockEndpoint("mock:C").expectedMessageCount(1); + template.sendBody("direct:start", "Hello"); + // wait at least longer than the delay in A so we can ensure its being cancelled + // and wont continue routing + Thread.sleep(4000); + assertMockEndpointsSatisfied(); } @@ -60,11 +68,11 @@ public class MulticastParallelTimeoutTes .end() .to("mock:result"); - from("direct:a").delay(3000).setBody(constant("A")); + from("direct:a").delay(3000).to("mock:A").setBody(constant("A")); - from("direct:b").setBody(constant("B")); + from("direct:b").to("mock:B").setBody(constant("B")); - from("direct:c").delay(500).setBody(constant("C")); + from("direct:c").delay(500).to("mock:C").setBody(constant("C")); // END SNIPPET: e1 } }; Modified: camel/trunk/camel-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=982655&r1=982654&r2=982655&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/resources/log4j.properties (original) +++ camel/trunk/camel-core/src/test/resources/log4j.properties Thu Aug 5 15:10:27 2010 @@ -41,7 +41,10 @@ log4j.logger.org.apache.camel.management log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN #log4j.logger.org.apache.camel.impl=TRACE #log4j.logger.org.apache.camel.util.FileUtil=TRACE +#log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE #log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE +#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE +#log4j.logger.org.apache.camel.processor.Pipeline=TRACE # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender