Updated Branches:
  refs/heads/master 96ce8d0f8 -> fabc86825

CAMEL-6246: Added optimistick lock retry policy to aggregate eip. Thanks to 
Aaron Whiteside for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fabc8682
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fabc8682
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fabc8682

Branch: refs/heads/master
Commit: fabc8682592f1169b006e1de85840eb758b8fadb
Parents: 96ce8d0
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat May 11 11:05:12 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat May 11 11:05:12 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/model/AggregateDefinition.java    |   34 ++++-
 .../model/OptimisticLockRetryPolicyDefinition.java |  140 ++++++++++++++
 .../processor/aggregate/AggregateProcessor.java    |   27 ++-
 .../aggregate/OptimisticLockRetryPolicy.java       |  148 ++++++++++++++
 .../resources/org/apache/camel/model/jaxb.index    |    1 +
 ...DistributedConcurrentPerCorrelationKeyTest.java |    2 -
 .../DistributedOptimisticLockFailingTest.java      |  149 +++++++++++++++
 .../aggregator/OptimisticLockRetryPolicyTest.java  |  122 ++++++++++++
 8 files changed, 612 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fabc8682/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index d8f26d8..fce58e5 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -38,6 +38,7 @@ import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.concurrent.SynchronousExecutorService;
@@ -58,6 +59,8 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     private ExpressionSubElementDefinition completionTimeoutExpression;
     @XmlElement(name = "completionSize")
     private ExpressionSubElementDefinition completionSizeExpression;
+    @XmlElement(name = "optimisticLockRetryPolicy")
+    private OptimisticLockRetryPolicyDefinition 
optimisticLockRetryPolicyDefinition;
     @XmlTransient
     private ExpressionDefinition expression;
     @XmlElementRef
@@ -70,6 +73,8 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     private ScheduledExecutorService timeoutCheckerExecutorService;
     @XmlTransient
     private AggregationRepository aggregationRepository;
+    @XmlTransient
+    private OptimisticLockRetryPolicy optimisticLockRetryPolicy;
     @XmlAttribute
     private Boolean parallelProcessing;
     @XmlAttribute
@@ -237,7 +242,13 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
         if (getForceCompletionOnStop() != null) {
             answer.setForceCompletionOnStop(getForceCompletionOnStop());
         }
-
+        if (optimisticLockRetryPolicy == null) {
+            if (getOptimisticLockRetryPolicyDefinition() != null) {
+                
answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy());
+            }
+        } else {
+            answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy);
+        }
         return answer;
     }
 
@@ -316,6 +327,22 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
         this.completionSize = completionSize;
     }
 
+    public OptimisticLockRetryPolicyDefinition 
getOptimisticLockRetryPolicyDefinition() {
+        return optimisticLockRetryPolicyDefinition;
+    }
+
+    public void 
setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition 
optimisticLockRetryPolicyDefinition) {
+        this.optimisticLockRetryPolicyDefinition = 
optimisticLockRetryPolicyDefinition;
+    }
+
+    public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
+        return optimisticLockRetryPolicy;
+    }
+
+    public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy 
optimisticLockRetryPolicy) {
+        this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
+    }
+
     public Long getCompletionInterval() {
         return completionInterval;
     }
@@ -738,6 +765,11 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
         setOptimisticLocking(true);
         return this;
     }
+
+    public AggregateDefinition 
optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) {
+        setOptimisticLockRetryPolicy(policy);
+        return this;
+    }
     
     public AggregateDefinition executorService(ExecutorService 
executorService) {
         setExecutorService(executorService);

http://git-wip-us.apache.org/repos/asf/camel/blob/fabc8682/camel-core/src/main/java/org/apache/camel/model/OptimisticLockRetryPolicyDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/OptimisticLockRetryPolicyDefinition.java
 
b/camel-core/src/main/java/org/apache/camel/model/OptimisticLockRetryPolicyDefinition.java
new file mode 100644
index 0000000..87b449d
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/model/OptimisticLockRetryPolicyDefinition.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
+
+/**
+ * Represents an XML &lt;optimisticLockRetryPolicy/&gt; element
+ *
+ * @version
+ */
+@XmlRootElement(name = "optimisticLockRetryPolicy")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class OptimisticLockRetryPolicyDefinition {
+    @XmlAttribute
+    private Integer maximumRetries;
+    @XmlAttribute
+    private Long retryDelay;
+    @XmlAttribute
+    private Long maximumRetryDelay;
+    @XmlAttribute
+    private Boolean exponentialBackOff;
+    @XmlAttribute
+    private Boolean randomBackOff;
+
+    public OptimisticLockRetryPolicyDefinition() {
+    }
+
+    public OptimisticLockRetryPolicy createOptimisticLockRetryPolicy() {
+        OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy();
+        if (maximumRetries != null) {
+            policy.setMaximumRetries(maximumRetries);
+        }
+        if (retryDelay != null) {
+            policy.setRetryDelay(retryDelay);
+        }
+        if (maximumRetryDelay != null) {
+            policy.setMaximumRetryDelay(maximumRetryDelay);
+        }
+        if (exponentialBackOff != null) {
+            policy.setExponentialBackOff(exponentialBackOff);
+        }
+        if (randomBackOff != null) {
+            policy.setRandomBackOff(randomBackOff);
+        }
+        return policy;
+    }
+
+    public OptimisticLockRetryPolicyDefinition maximumRetries(int 
maximumRetries) {
+        setMaximumRetries(maximumRetries);
+        return this;
+    }
+
+    public Integer getMaximumRetries() {
+        return maximumRetries;
+    }
+
+    public void setMaximumRetries(Integer maximumRetries) {
+        this.maximumRetries = maximumRetries;
+    }
+
+    public OptimisticLockRetryPolicyDefinition retryDelay(long retryDelay) {
+        setRetryDelay(retryDelay);
+        return this;
+    }
+
+    public Long getRetryDelay() {
+        return retryDelay;
+    }
+
+    public void setRetryDelay(Long retryDelay) {
+        this.retryDelay = retryDelay;
+    }
+
+    public OptimisticLockRetryPolicyDefinition maximumRetryDelay(long 
maximumRetryDelay) {
+        setMaximumRetryDelay(maximumRetryDelay);
+        return this;
+    }
+
+    public Long getMaximumRetryDelay() {
+        return maximumRetryDelay;
+    }
+
+    public void setMaximumRetryDelay(Long maximumRetryDelay) {
+        this.maximumRetryDelay = maximumRetryDelay;
+    }
+
+    public OptimisticLockRetryPolicyDefinition exponentialBackOff() {
+        return exponentialBackOff(true);
+    }
+
+    public OptimisticLockRetryPolicyDefinition exponentialBackOff(boolean 
exponentialBackOff) {
+        setExponentialBackOff(exponentialBackOff);
+        return this;
+    }
+
+    public Boolean getExponentialBackOff() {
+        return exponentialBackOff;
+    }
+
+    public void setExponentialBackOff(Boolean exponentialBackOff) {
+        this.exponentialBackOff = exponentialBackOff;
+    }
+
+    public OptimisticLockRetryPolicyDefinition randomBackOff() {
+        return randomBackOff(true);
+    }
+
+    public OptimisticLockRetryPolicyDefinition randomBackOff(boolean 
randomBackOff) {
+        setRandomBackOff(randomBackOff);
+        return this;
+    }
+
+    public Boolean getRandomBackOff() {
+        return randomBackOff;
+    }
+
+    public void setRandomBackOff(Boolean randomBackOff) {
+        this.randomBackOff = randomBackOff;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/fabc8682/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index c113c60..46ef290 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -21,7 +21,6 @@ import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -90,6 +89,7 @@ public class AggregateProcessor extends ServiceSupport 
implements Processor, Nav
     private final Expression correlationExpression;
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
+    private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new 
OptimisticLockRetryPolicy();
     private ScheduledExecutorService timeoutCheckerExecutorService;
     private boolean shutdownTimeoutCheckerExecutorService;
     private ScheduledExecutorService recoverService;
@@ -194,24 +194,27 @@ public class AggregateProcessor extends ServiceSupport 
implements Processor, Nav
 
         // when optimist locking is enabled we keep trying until we succeed
         if (optimisticLocking) {
-            boolean done = false;
+            boolean exhaustedRetries = true;
             int attempt = 0;
-            while (!done) {
+            do {
                 attempt++;
                 // copy exchange, and do not share the unit of work
                 // the aggregated output runs in another unit of work
                 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
false);
                 try {
                     doAggregation(key, copy);
-                    done = true;
+                    exhaustedRetries = false;
+                    break;
                 } catch 
(OptimisticLockingAggregationRepository.OptimisticLockingException e) {
                     LOG.trace("On attempt {} 
OptimisticLockingAggregationRepository: {} threw OptimisticLockingException 
while trying to add() key: {} and exchange: {}",
                               new Object[]{attempt, aggregationRepository, 
key, copy, e});
+                    optimisticLockRetryPolicy.doDelay(attempt);
                 }
-                // use a little random delay to avoid being to aggressive when 
retrying, and avoid potential clashing
-                int ran = new Random().nextInt(1000);
-                LOG.trace("Sleeping for {} millis before attempting again", 
ran);
-                Thread.sleep(ran);
+            } while (optimisticLockRetryPolicy.shouldRetry(attempt));
+
+            if (exhaustedRetries) {
+                throw new CamelExchangeException("Exhausted optimistic locking 
retry attempts, tried " + attempt + " times", exchange,
+                        new 
OptimisticLockingAggregationRepository.OptimisticLockingException());
             }
         } else {
             // copy exchange, and do not share the unit of work
@@ -689,6 +692,14 @@ public class AggregateProcessor extends ServiceSupport 
implements Processor, Nav
         this.shutdownTimeoutCheckerExecutorService = 
shutdownTimeoutCheckerExecutorService;
     }
 
+    public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy 
optimisticLockRetryPolicy) {
+        this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
+    }
+
+    public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
+        return optimisticLockRetryPolicy;
+    }
+
     /**
      * On completion task which keeps the booking of the in progress up to date
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/fabc8682/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockRetryPolicy.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockRetryPolicy.java
new file mode 100644
index 0000000..3ca3081
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockRetryPolicy.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.util.Random;
+
+/**
+ * Class to control how failed optimistic locks are tried. This policy 
supports random and exponential back-off delays.
+ * <p/>
+ * If {@code randomBackOff} is enabled and a value is supplied for {@code 
retryDelay} the value will be ignored.
+ * <p/>
+ * If {@code randomBackOff} is enabled and no value is set for {@code 
maximumRetryDelay}, a default value of 1000ms will
+ * be used, the random delay will be between 0 and 1000 milliseconds.
+ * <p/>
+ * If both {@code randomBackOff} and {@code exponentialBackOff} are enabled, 
{@code exponentialBackOff} will take precedence.
+ * <p/>
+ * If {@code exponentialBackOff} is enabled and a value is set for {@code 
maximumRetryDelay}, the retry delay will keep
+ * doubling in value until it reaches or exceeds {@code maximumRetryDelay}. 
After it has reached or exceeded {@code maximumRetryDelay}
+ * the value of {@code maximumRetryDelay} will be used as the retry delay.
+ * <p/>
+ * If both {@code exponentialBackOff} and {@code randomBackOff} are disabled, 
the value of {@code retryDelay} will be used
+ * as the retry delay and remain constant through all the retry attempts.
+ * <p/>
+ * If the value of {@code maximumRetries} is set above zero, retry attempts 
will stop at the value specified.
+ * <p/>
+ * The default behaviour of this policy is to retry forever and exponentially 
increase the back-off delay starting with 50ms.
+ *
+ * @version
+ */
+public class OptimisticLockRetryPolicy {
+    private static final long DEFAULT_MAXIMUM_RETRY_DELAY = 1000L;
+
+    private int maximumRetries;
+    private long retryDelay = 50L;
+    private long maximumRetryDelay;
+    private boolean exponentialBackOff = true;
+    private boolean randomBackOff;
+
+    public OptimisticLockRetryPolicy() {
+    }
+
+    public boolean shouldRetry(final int retryCounter) {
+        return maximumRetries <= 0 || retryCounter < maximumRetries;
+    }
+
+    public void doDelay(final int retryCounter) throws InterruptedException {
+        if (retryDelay > 0 || randomBackOff) {
+            long sleepFor;
+            sleepFor = exponentialBackOff ? (retryDelay << retryCounter)
+                    : (randomBackOff ? new 
Random().nextInt((int)(maximumRetryDelay > 0 ? maximumRetryDelay : 
DEFAULT_MAXIMUM_RETRY_DELAY)) : retryDelay);
+            if (maximumRetryDelay > 0 && sleepFor > maximumRetryDelay) {
+                sleepFor = maximumRetryDelay;
+            }
+            Thread.sleep(sleepFor);
+        }
+    }
+
+    public int getMaximumRetries() {
+        return maximumRetries;
+    }
+
+    public void setMaximumRetries(int maximumRetries) {
+        this.maximumRetries = maximumRetries;
+    }
+
+    public OptimisticLockRetryPolicy maximumRetries(int maximumRetries) {
+        setMaximumRetries(maximumRetries);
+        return this;
+    }
+
+    public long getRetryDelay() {
+        return retryDelay;
+    }
+
+    public void setRetryDelay(long retryDelay) {
+        this.retryDelay = retryDelay;
+    }
+
+    public OptimisticLockRetryPolicy retryDelay(long retryDelay) {
+        setRetryDelay(retryDelay);
+        return this;
+    }
+
+    public long getMaximumRetryDelay() {
+        return maximumRetryDelay;
+    }
+
+    public void setMaximumRetryDelay(long maximumRetryDelay) {
+        this.maximumRetryDelay = maximumRetryDelay;
+    }
+
+    public OptimisticLockRetryPolicy maximumRetryDelay(long maximumRetryDelay) 
{
+        setMaximumRetryDelay(maximumRetryDelay);
+        return this;
+    }
+
+    public boolean isExponentialBackOff() {
+        return exponentialBackOff;
+    }
+
+    public void setExponentialBackOff(boolean exponentialBackOff) {
+        this.exponentialBackOff = exponentialBackOff;
+    }
+
+    public OptimisticLockRetryPolicy exponentialBackOff() {
+        setExponentialBackOff(true);
+        return this;
+    }
+
+    public boolean isRandomBackOff() {
+        return randomBackOff;
+    }
+
+    public void setRandomBackOff(boolean randomBackOff) {
+        this.randomBackOff = randomBackOff;
+    }
+
+    public OptimisticLockRetryPolicy randomBackOff() {
+        setRandomBackOff(true);
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new 
StringBuilder("OptimisticLockRetryPolicy[");
+        sb.append("maximumRetries=").append(maximumRetries);
+        sb.append(", retryDelay=").append(retryDelay);
+        sb.append(", maximumRetryDelay=").append(maximumRetryDelay);
+        sb.append(", exponentialBackOff=").append(exponentialBackOff);
+        sb.append(", randomBackOff=").append(randomBackOff);
+        sb.append(']');
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/fabc8682/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
----------------------------------------------------------------------
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index 
b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index a877af7..1e32f42 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -44,6 +44,7 @@ MarshalDefinition
 MulticastDefinition
 OnCompletionDefinition
 OnExceptionDefinition
+OptimisticLockRetryPolicyDefinition
 OptionalIdentifiedDefinition
 OtherwiseDefinition
 PackageScanDefinition

http://git-wip-us.apache.org/repos/asf/camel/blob/fabc8682/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
index 4d089a7..44caad2 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
@@ -27,7 +27,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
 import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
-import org.junit.Test;
 
 /**
  * @version
@@ -39,7 +38,6 @@ public class DistributedConcurrentPerCorrelationKeyTest 
extends AbstractDistribu
     private int size = 200;
     private final String uri = "direct:start";
 
-    @Test
     public void testAggregateConcurrentPerCorrelationKey() throws Exception {
         ExecutorService service = Executors.newFixedThreadPool(50);
         List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();

http://git-wip-us.apache.org/repos/asf/camel/blob/fabc8682/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedOptimisticLockFailingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedOptimisticLockFailingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedOptimisticLockFailingTest.java
new file mode 100644
index 0000000..810eb18
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedOptimisticLockFailingTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
+import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
+
+/**
+ * @version
+ */
+public class DistributedOptimisticLockFailingTest extends 
AbstractDistributedTest {
+
+    private static final class AlwaysFailingRepository extends 
MemoryAggregationRepository {
+        @Override
+        public Exchange add(CamelContext camelContext, String key, Exchange 
oldExchange, Exchange newExchange) {
+            throw new OptimisticLockingException();
+        }
+    }
+
+    private static final class EverySecondOneFailsRepository extends 
MemoryAggregationRepository {
+        private AtomicInteger counter = new AtomicInteger();
+        private EverySecondOneFailsRepository() {
+            super(true);
+        }
+        @Override
+        public Exchange add(CamelContext camelContext, String key, Exchange 
oldExchange, Exchange newExchange) {
+            int count = counter.incrementAndGet();
+            if (count % 2 == 0) {
+                throw new OptimisticLockingException();
+            } else {
+                return super.add(camelContext, key, oldExchange, newExchange);
+            }
+        }
+    }
+    private EverySecondOneFailsRepository sharedRepository = new 
EverySecondOneFailsRepository();
+
+    public void testAlwaysFails() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        MockEndpoint mock2 = getMockEndpoint2("mock:result");
+        mock2.expectedMessageCount(0);
+
+        try {
+            template.sendBodyAndHeader("direct:fails", "hello world", "id", 1);
+            fail("Should throw CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(CamelExchangeException.class, e.getCause());
+            
assertIsInstanceOf(OptimisticLockingAggregationRepository.OptimisticLockingException.class,
 e.getCause().getCause());
+        }
+
+        try {
+            template2.sendBodyAndHeader("direct:fails", "hello world", "id", 
1);
+            fail("Should throw CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(CamelExchangeException.class, e.getCause());
+            
assertIsInstanceOf(OptimisticLockingAggregationRepository.OptimisticLockingException.class,
 e.getCause().getCause());
+        }
+
+        mock.assertIsSatisfied();
+        mock2.assertIsSatisfied();
+    }
+
+    public void testEverySecondOneFails() throws Exception {
+        int size = 200;
+        ExecutorService service = Executors.newFixedThreadPool(50);
+        List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
+        for (int i = 0; i < size; i++) {
+            final int id = i % 25;
+            final int choice = i % 2;
+            final int count = i;
+            tasks.add(new Callable<Object>() {
+                public Object call() throws Exception {
+                    if (choice == 0) {
+                        template.sendBodyAndHeader("direct:everysecondone", "" 
+ count, "id", id);
+                    } else {
+                        template2.sendBodyAndHeader("direct:everysecondone", 
"" + count, "id", id);
+                    }
+                    return null;
+                }
+            });
+        }
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        MockEndpoint mock2 = getMockEndpoint2("mock:result");
+
+        // submit all tasks
+        service.invokeAll(tasks);
+        service.shutdown();
+        service.awaitTermination(10, TimeUnit.SECONDS);
+
+        int contextCount = mock.getReceivedCounter();
+        int context2Count = mock2.getReceivedCounter();
+
+        assertEquals(25, contextCount + context2Count);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:fails")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .aggregationRepository(new AlwaysFailingRepository())
+                        .optimisticLocking()
+                        .optimisticLockRetryPolicy(new 
OptimisticLockRetryPolicy().maximumRetries(5))
+                        .completionSize(2)
+                        .to("mock:result");
+
+                from("direct:everysecondone")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .aggregationRepository(sharedRepository)
+                        .optimisticLocking()
+                        .completionSize(8)
+                        .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/fabc8682/camel-core/src/test/java/org/apache/camel/processor/aggregator/OptimisticLockRetryPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/OptimisticLockRetryPolicyTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/OptimisticLockRetryPolicyTest.java
new file mode 100644
index 0000000..cbea08d
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/OptimisticLockRetryPolicyTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import junit.framework.TestCase;
+import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
+
+/**
+ * @version
+ */
+public class OptimisticLockRetryPolicyTest extends TestCase {
+
+    private static long precision = 5L; // give or take 5ms
+
+    public void testRandomBackOff() throws Exception {
+        OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy();
+        policy.setRandomBackOff(true);
+        policy.setExponentialBackOff(false);
+        policy.setMaximumRetryDelay(500L);
+
+        for (int i = 0; i < 10; i++) {
+            long start = System.currentTimeMillis();
+            policy.doDelay(i);
+            long elapsed = System.currentTimeMillis() - start;
+            assertTrue(elapsed <= policy.getMaximumRetryDelay() + precision && 
elapsed >= 0);
+        }
+    }
+
+    public void testExponentialBackOff() throws Exception {
+        OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy();
+        policy.setRandomBackOff(false);
+        policy.setExponentialBackOff(true);
+        policy.setMaximumRetryDelay(0L);
+        policy.setRetryDelay(50L);
+
+        for (int i = 0; i < 10; i++) {
+            long start = System.currentTimeMillis();
+            policy.doDelay(i);
+            long elapsed = System.currentTimeMillis() - start;
+            assertTrue(elapsed >= (policy.getRetryDelay() << i) - precision);
+            assertTrue(elapsed <= (policy.getRetryDelay() << i) + precision);
+        }
+    }
+
+    public void testExponentialBackOffMaximumRetryDelay() throws Exception {
+        OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy();
+        policy.setRandomBackOff(false);
+        policy.setExponentialBackOff(true);
+        policy.setMaximumRetryDelay(200L);
+        policy.setRetryDelay(50L);
+
+        for (int i = 0; i < 10; i++) {
+            long start = System.currentTimeMillis();
+            policy.doDelay(i);
+            long elapsed = System.currentTimeMillis() - start;
+            switch (i) {
+            case 0:
+                assertTrue(elapsed <= 50 + precision);
+                assertTrue(elapsed >= 50 - precision);
+                break;
+            case 1:
+                assertTrue(elapsed <= 100 + precision);
+                assertTrue(elapsed >= 100 - precision);
+                break;
+            default:
+                assertTrue(elapsed <= 200 + precision);
+                assertTrue(elapsed >= 200 - precision);
+                break;
+            }
+        }
+    }
+
+    public void testRetryDelay() throws Exception {
+        OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy();
+        policy.setRandomBackOff(false);
+        policy.setExponentialBackOff(false);
+        policy.setMaximumRetryDelay(0L);
+        policy.setRetryDelay(50L);
+
+        for (int i = 0; i < 10; i++) {
+            long start = System.currentTimeMillis();
+            policy.doDelay(i);
+            long elapsed = System.currentTimeMillis() - start;
+            assertTrue(elapsed <= policy.getRetryDelay() + precision);
+            assertTrue(elapsed >= policy.getRetryDelay() - precision);
+        }
+    }
+
+    public void testMaximumRetries() throws Exception {
+        OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy();
+        policy.setRandomBackOff(false);
+        policy.setExponentialBackOff(false);
+        policy.setMaximumRetryDelay(0L);
+        policy.setMaximumRetries(2);
+        policy.setRetryDelay(50L);
+
+        for (int i = 0; i < 10; i++) {
+            switch (i) {
+            case 0:
+            case 1:
+                assertTrue(policy.shouldRetry(i));
+                break;
+            default:
+                assertFalse(policy.shouldRetry(i));
+            }
+        }
+    }
+}
\ No newline at end of file

Reply via email to