Repository: camel Updated Branches: refs/heads/camel-2.13.x 3f1fdb647 -> 72e8ede12 refs/heads/camel-2.14.x 08aa77bc7 -> 4d4c39c3e
CAMEL-7490: Error handler in async redelivery mode should use same way to calculate next delay value as in sync mode. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72e8ede1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72e8ede1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72e8ede1 Branch: refs/heads/camel-2.13.x Commit: 72e8ede12a7e0dba013af912e561695634d7f9f1 Parents: 3f1fdb6 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Nov 29 10:19:10 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Nov 29 10:47:16 2014 +0100 ---------------------------------------------------------------------- .../camel/processor/RedeliveryErrorHandler.java | 3 +- ...orHandlerNonBlockedRedeliveryHeaderTest.java | 83 ++++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/72e8ede1/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index beb2b5e..e48eecd 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -542,7 +542,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); // calculate the redelivery delay - data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter); + data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); + if (data.redeliveryDelay > 0) { // schedule the redelivery task if (log.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/camel/blob/72e8ede1/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.java new file mode 100644 index 0000000..98d7521 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @version + */ +public class RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest extends ContextTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.class); + + private static volatile int attempt; + + public void testRedelivery() throws Exception { + MockEndpoint before = getMockEndpoint("mock:result"); + before.expectedBodiesReceived("Hello World", "Hello Camel"); + + // we use NON blocked redelivery delay so the messages arrive which completes first + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedBodiesReceived("Hello Camel", "Hello World"); + + template.sendBodyAndHeader("seda:start", "World", Exchange.REDELIVERY_DELAY, 500); + template.sendBodyAndHeader("seda:start", "Camel", Exchange.REDELIVERY_DELAY, 500); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // use async delayed which means non blocking + // set a high default value which we override by the headers so this test can complete in due time + errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliveryDelay(10000).asyncDelayedRedelivery()); + + from("seda:start") + .to("log:before") + .to("mock:before") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + LOG.info("Processing at attempt " + attempt + " " + exchange); + + String body = exchange.getIn().getBody(String.class); + if (body.contains("World")) { + if (++attempt <= 2) { + LOG.info("Processing failed will thrown an exception"); + throw new IllegalArgumentException("Damn"); + } + } + + exchange.getIn().setBody("Hello " + body); + LOG.info("Processing at attempt " + attempt + " complete " + exchange); + } + }) + .to("log:after") + .to("mock:result"); + } + }; + } +}