Repository: camel Updated Branches: refs/heads/master 8fb2303be -> af8d184ed
CAMEL-5585: RedeliverErrorHandler - Should quicker reject running scheduled redeliver tasks if shutting down and not allowed to do redeliver Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af8d184e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af8d184e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af8d184e Branch: refs/heads/master Commit: af8d184ed549169e2290c6014199ce2de1cb1586 Parents: 8fb2303 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 6 14:47:32 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri May 6 14:49:52 2016 +0200 ---------------------------------------------------------------------- .../camel/processor/RedeliveryErrorHandler.java | 61 +++++++++++++++- ...iveryWhileStoppingDeadLetterChannelTest.java | 73 ++++++++++++++++++++ .../NotAllowRedeliveryWhileStoppingTest.java | 61 ++++++++++++++++ 3 files changed, 193 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index e172796..361f088 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -48,6 +48,7 @@ import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.StopWatch; import org.apache.camel.util.URISupport; /** @@ -119,11 +120,58 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme } /** + * Task for sleeping during redelivery attempts. + * <p/> + * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool + * is used for sleeping and trigger redeliveries. + */ + private final class RedeliverSleepTask { + + private final RedeliveryPolicy policy; + private final long delay; + + RedeliverSleepTask(RedeliveryPolicy policy, long delay) { + this.policy = policy; + this.delay = delay; + } + + public boolean sleep() throws InterruptedException { + // for small delays then just sleep + if (delay < 1000) { + policy.sleep(delay); + return true; + } + + StopWatch watch = new StopWatch(); + + log.debug("Sleeping for: {} millis until attempting redelivery", delay); + while (watch.taken() < delay) { + // sleep using 1 sec interval + + long delta = delay - watch.taken(); + long max = Math.min(1000, delta); + if (max > 0) { + log.trace("Sleeping for: {} millis until waking up for re-check", max); + Thread.sleep(max); + } + + // are we preparing for shutdown then only do redelivery if allowed + if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) { + log.debug("Rejected redelivery while stopping"); + return false; + } + } + + return true; + } + } + + /** * Tasks which performs asynchronous redelivery attempts, and being triggered by a * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task * has to be delayed before a redelivery attempt is performed. */ - private class AsyncRedeliveryTask implements Callable<Boolean> { + private final class AsyncRedeliveryTask implements Callable<Boolean> { private final Exchange exchange; private final AsyncCallback callback; @@ -439,8 +487,17 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme try { // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping redeliverySleepCounter.incrementAndGet(); - data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); + RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay); + boolean complete = task.sleep(); redeliverySleepCounter.decrementAndGet(); + if (!complete) { + // the task was rejected + exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping")); + // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange + exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); + // jump to start of loop which then detects that we are failed and exhausted + continue; + } } catch (InterruptedException e) { redeliverySleepCounter.decrementAndGet(); // we was interrupted so break out http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java new file mode 100644 index 0000000..519e219 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.RejectedExecutionException; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.StopWatch; + +/** + * @version + */ +public class NotAllowRedeliveryWhileStoppingDeadLetterChannelTest extends ContextTestSupport { + + public void testRedelivery() throws Exception { + StopWatch watch = new StopWatch(); + + MockEndpoint before = getMockEndpoint("mock:foo"); + before.expectedMessageCount(1); + + template.sendBody("seda:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + Thread.sleep(500); + + context.stopRoute("foo"); + + // we should reject the task and stop quickly + assertTrue("Should stop quickly: " + watch.taken(), watch.taken() < 5000); + + // should go to DLC + Exchange dead = getMockEndpoint("mock:dead").getExchanges().get(0); + assertNotNull(dead); + + Throwable cause = dead.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); + assertNotNull(cause); + assertIsInstanceOf(RejectedExecutionException.class, cause); + assertEquals("Redelivery not allowed while stopping", cause.getMessage()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead") + .maximumRedeliveries(5).redeliveryDelay(5000).allowRedeliveryWhileStopping(false)); + + from("seda:start").routeId("foo") + .to("mock:foo") + .throwException(new IllegalArgumentException("Forced")); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java new file mode 100644 index 0000000..5ca4b59 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.StopWatch; + +/** + * @version + */ +public class NotAllowRedeliveryWhileStoppingTest extends ContextTestSupport { + + public void testRedelivery() throws Exception { + StopWatch watch = new StopWatch(); + + MockEndpoint before = getMockEndpoint("mock:foo"); + before.expectedMessageCount(1); + + template.sendBody("seda:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + Thread.sleep(500); + + context.stop(); + + // we should reject the task and stop quickly + assertTrue("Should stop quickly: " + watch.taken(), watch.taken() < 5000); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(defaultErrorHandler() + .maximumRedeliveries(5).redeliveryDelay(5000).allowRedeliveryWhileStopping(false)); + + from("seda:start") + .to("mock:foo") + .throwException(new IllegalArgumentException("Forced")); + } + }; + } +}