Author: davsclaus Date: Wed Mar 27 16:20:30 2013 New Revision: 1461691 URL: http://svn.apache.org/r1461691 Log: CAMEL-6042: Aggregate with optimistick locking enable should wait a little between retry attempts to avoid clashing. Existing unit tests needs to be improved.
Modified: camel/trunk/camel-core/pom.xml camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Modified: camel/trunk/camel-core/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/pom.xml?rev=1461691&r1=1461690&r2=1461691&view=diff ============================================================================== --- camel/trunk/camel-core/pom.xml (original) +++ camel/trunk/camel-core/pom.xml Wed Mar 27 16:20:30 2013 @@ -191,6 +191,8 @@ <!-- skip file stress tests as they are intended to run manually --> <excludes> <exclude>org/apache/camel/component/file/stress/**.java</exclude> + <exclude>**/DistributedCompletionIntervalTest.java</exclude> + <exclude>**/DistributedConcurrentPerCorrelationKeyTest.java</exclude> </excludes> <forkedProcessTimeoutInSeconds>3000</forkedProcessTimeoutInSeconds> </configuration> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1461691&r1=1461690&r2=1461691&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Mar 27 16:20:30 2013 @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -191,8 +192,7 @@ public class AggregateProcessor extends throw new ClosedCorrelationKeyException(key, exchange); } - // - // todo: explain optimistic lock handling + // when optimist locking is enabled we keep trying until we succeed if (optimisticLocking) { boolean done = false; int attempt = 0; @@ -208,6 +208,10 @@ public class AggregateProcessor extends LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}", new Object[]{attempt, aggregationRepository, key, copy, e}); } + // use a little random delay to avoid being to aggressive when retrying, and avoid potential clashing + int ran = new Random().nextInt(1000); + LOG.trace("Sleeping for {} millis before attempting again", ran); + Thread.sleep(ran); } } else { // copy exchange, and do not share the unit of work