Author: davsclaus Date: Mon Feb 4 14:32:04 2013 New Revision: 1442135 URL: http://svn.apache.org/viewvc?rev=1442135&view=rev Log: CAMEL-6003: Using allowRedeliveryWhileStopping=false on Dead Letter Channel now moves the message to the DLC (when stopping) instead of rejecting the message.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java camel/trunk/camel-core/src/test/resources/log4j.properties Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=1442135&r1=1442134&r2=1442135&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Mon Feb 4 14:32:04 2013 @@ -79,4 +79,11 @@ public class DeadLetterChannel extends R // DeadLetterChannel handles errors before sending to DLQ return ExpressionToPredicateAdapter.toPredicate(ExpressionBuilder.constantExpression(true)); } + + @Override + protected boolean isRunAllowedOnPreparingShutdown() { + // allow tu run as we want to move the message eto DLC, instead of rejecting the message + return true; + } + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1442135&r1=1442134&r2=1442135&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Feb 4 14:32:04 2013 @@ -214,18 +214,46 @@ public abstract class RedeliveryErrorHan log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)"); return true; } else if (preparingShutdown) { - // do not allow redelivery as we are preparing for shutdown - log.trace("isRunAllowed() -> false (Run not allowed as we are preparing for shutdown)"); - return false; + // we are preparing for shutdown, now determine if we can still run + boolean answer = isRunAllowedOnPreparingShutdown(); + log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer); + return answer; } } - // fallback and use code from super - boolean answer = super.isRunAllowed(); + // we cannot run if we are stopping/stopped + boolean answer = !isStoppingOrStopped(); log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer); return answer; } + protected boolean isRunAllowedOnPreparingShutdown() { + return false; + } + + protected boolean isRedeliveryAllowed(RedeliveryData data) { + // redelivery policy can control if redelivery is allowed during stopping/shutdown + // but this only applies during a redelivery (counter must > 0) + if (data.redeliveryCounter > 0) { + boolean stopping = isStoppingOrStopped(); + if (!preparingShutdown && !stopping) { + log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)"); + return true; + } else { + // we are stopping or preparing to shutdown + if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { + log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)"); + return true; + } else { + log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)"); + return false; + } + } + } + + return true; + } + @Override public void prepareShutdown(boolean forced) { // prepare for shutdown, eg do not allow redelivery if configured @@ -274,9 +302,12 @@ public abstract class RedeliveryErrorHan handleException(exchange, data); } - // compute if we are exhausted or not + // compute if we are exhausted, and whether redelivery is allowed boolean exhausted = isExhausted(exchange, data); - if (exhausted) { + boolean redeliverAllowed = isRedeliveryAllowed(data); + + // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC) + if (!redeliverAllowed || exhausted) { Processor target = null; boolean deliver = true; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java?rev=1442135&r1=1442134&r2=1442135&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java Mon Feb 4 14:32:04 2013 @@ -219,7 +219,11 @@ public abstract class ServiceSupport imp @Override public boolean isRunAllowed() { - return !(stopping.get() || stopped.get()); + return !isStoppingOrStopped(); + } + + public boolean isStoppingOrStopped() { + return stopping.get() || stopped.get(); } /** Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java?rev=1442135&r1=1442134&r2=1442135&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java Mon Feb 4 14:32:04 2013 @@ -16,18 +16,23 @@ */ package org.apache.camel.processor; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.util.StopWatch; -import org.junit.Ignore; -@Ignore public class RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest extends ContextTestSupport { + private final AtomicInteger counter = new AtomicInteger(); + public void testRedeliveryErrorHandlerNoRedeliveryOnShutdown() throws Exception { getMockEndpoint("mock:foo").expectedMessageCount(1); getMockEndpoint("mock:deadLetter").expectedMessageCount(1); + getMockEndpoint("mock:deadLetter").setResultWaitTime(25000); template.sendBody("seda:foo", "Hello World"); @@ -35,13 +40,26 @@ public class RedeliveryDeadLetterErrorHa // should not take long to stop the route StopWatch watch = new StopWatch(); + // sleep 3 seconds to do some redeliveries before we stop + Thread.sleep(3000); + log.info("==== stopping route foo ===="); context.stopRoute("foo"); watch.stop(); - getMockEndpoint("mock:deadLetter").setResultWaitTime(25000); getMockEndpoint("mock:deadLetter").assertIsSatisfied(); - assertTrue("Should stop route faster, was " + watch.taken(), watch.taken() < 4000); + log.info("OnRedelivery processor counter {}", counter.get()); + + assertTrue("Should stop route faster, was " + watch.taken(), watch.taken() < 7000); + assertTrue("Redelivery counter should be >= 2 and < 12, was: " + counter.get(), counter.get() >= 2 && counter.get() < 12); + } + + private final class MyRedeliverProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + counter.incrementAndGet(); + } } @Override @@ -51,6 +69,7 @@ public class RedeliveryDeadLetterErrorHa public void configure() throws Exception { errorHandler(deadLetterChannel("mock:deadLetter") .allowRedeliveryWhileStopping(false) + .onRedelivery(new MyRedeliverProcessor()) .maximumRedeliveries(20).redeliveryDelay(1000).retryAttemptedLogLevel(LoggingLevel.INFO)); from("seda:foo").routeId("foo") Modified: camel/trunk/camel-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=1442135&r1=1442134&r2=1442135&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/resources/log4j.properties (original) +++ camel/trunk/camel-core/src/test/resources/log4j.properties Mon Feb 4 14:32:04 2013 @@ -37,6 +37,7 @@ log4j.logger.org.apache.camel.impl.Defau #log4j.logger.org.apache.camel.component.mock=DEBUG #log4j.logger.org.apache.camel.component.file=TRACE #log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE +#log4j.logger.org.apache.camel.processor.DeadLetterChannel=TRACE #log4j.logger.org.apache.camel.processor.Pipeline=TRACE #log4j.logger.org.apache.camel.processor.MulticastProcessor=TRACE #log4j.logger.org.apache.camel.processor.RecipientList=TRACE