CAMEL-7490: Error handler in async redelivery mode should use same way to 
calculate next delay value as in sync mode.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e9d096a4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e9d096a4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e9d096a4

Branch: refs/heads/master
Commit: e9d096a4299f51ce701f9826ac531664da79dfc6
Parents: ffbe4c0
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Nov 29 10:19:10 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Nov 29 10:19:10 2014 +0100

----------------------------------------------------------------------
 .../camel/processor/RedeliveryErrorHandler.java |  3 +-
 ...orHandlerNonBlockedRedeliveryHeaderTest.java | 83 ++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e9d096a4/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 20fda4a..b93f442 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -551,7 +551,8 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
             AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, 
callback, data);
 
             // calculate the redelivery delay
-            data.redeliveryDelay = 
data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, 
data.redeliveryCounter);
+            data.redeliveryDelay = determineRedeliveryDelay(exchange, 
data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
+
             if (data.redeliveryDelay > 0) {
                 // schedule the redelivery task
                 if (log.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e9d096a4/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.java
new file mode 100644
index 0000000..98d7521
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version 
+ */
+public class RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest extends 
ContextTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RedeliveryErrorHandlerNonBlockedRedeliveryHeaderTest.class);
+
+    private static volatile int attempt;
+
+    public void testRedelivery() throws Exception {
+        MockEndpoint before = getMockEndpoint("mock:result");
+        before.expectedBodiesReceived("Hello World", "Hello Camel");
+
+        // we use NON blocked redelivery delay so the messages arrive which 
completes first
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("Hello Camel", "Hello World");
+
+        template.sendBodyAndHeader("seda:start", "World", 
Exchange.REDELIVERY_DELAY, 500);
+        template.sendBodyAndHeader("seda:start", "Camel", 
Exchange.REDELIVERY_DELAY, 500);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // use async delayed which means non blocking
+                // set a high default value which we override by the headers 
so this test can complete in due time
+                
errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliveryDelay(10000).asyncDelayedRedelivery());
+
+                from("seda:start")
+                    .to("log:before")
+                    .to("mock:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws 
Exception {
+                            LOG.info("Processing at attempt " + attempt + " " 
+ exchange);
+
+                            String body = 
exchange.getIn().getBody(String.class);
+                            if (body.contains("World")) {
+                                if (++attempt <= 2) {
+                                    LOG.info("Processing failed will thrown an 
exception");
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+
+                            exchange.getIn().setBody("Hello " + body);
+                            LOG.info("Processing at attempt " + attempt + " 
complete " + exchange);
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
+            }
+        };
+    }
+}

Reply via email to