Updated Branches: refs/heads/camel-2.11.x c4c42a134 -> 23b48a8a0
CAMEL-6246: Added optimistick lock retry policy to aggregate eip. Thanks to Aaron Whiteside for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/23b48a8a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/23b48a8a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/23b48a8a Branch: refs/heads/camel-2.11.x Commit: 23b48a8a0324b0ca11acb65ac4722f720d092a62 Parents: c4c42a1 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat May 11 11:05:12 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat May 11 11:08:15 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/model/AggregateDefinition.java | 34 ++++- .../model/OptimisticLockRetryPolicyDefinition.java | 140 ++++++++++++++ .../processor/aggregate/AggregateProcessor.java | 27 ++- .../aggregate/OptimisticLockRetryPolicy.java | 148 ++++++++++++++ .../resources/org/apache/camel/model/jaxb.index | 1 + ...DistributedConcurrentPerCorrelationKeyTest.java | 2 - .../DistributedOptimisticLockFailingTest.java | 149 +++++++++++++++ .../aggregator/OptimisticLockRetryPolicyTest.java | 122 ++++++++++++ 8 files changed, 612 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/23b48a8a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index d8f26d8..fce58e5 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -38,6 +38,7 @@ import org.apache.camel.processor.UnitOfWorkProcessor; import org.apache.camel.processor.aggregate.AggregateProcessor; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; +import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.concurrent.SynchronousExecutorService; @@ -58,6 +59,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition private ExpressionSubElementDefinition completionTimeoutExpression; @XmlElement(name = "completionSize") private ExpressionSubElementDefinition completionSizeExpression; + @XmlElement(name = "optimisticLockRetryPolicy") + private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition; @XmlTransient private ExpressionDefinition expression; @XmlElementRef @@ -70,6 +73,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition private ScheduledExecutorService timeoutCheckerExecutorService; @XmlTransient private AggregationRepository aggregationRepository; + @XmlTransient + private OptimisticLockRetryPolicy optimisticLockRetryPolicy; @XmlAttribute private Boolean parallelProcessing; @XmlAttribute @@ -237,7 +242,13 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition if (getForceCompletionOnStop() != null) { answer.setForceCompletionOnStop(getForceCompletionOnStop()); } - + if (optimisticLockRetryPolicy == null) { + if (getOptimisticLockRetryPolicyDefinition() != null) { + answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy()); + } + } else { + answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy); + } return answer; } @@ -316,6 +327,22 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition this.completionSize = completionSize; } + public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() { + return optimisticLockRetryPolicyDefinition; + } + + public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) { + this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition; + } + + public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { + return optimisticLockRetryPolicy; + } + + public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { + this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; + } + public Long getCompletionInterval() { return completionInterval; } @@ -738,6 +765,11 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition setOptimisticLocking(true); return this; } + + public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) { + setOptimisticLockRetryPolicy(policy); + return this; + } public AggregateDefinition executorService(ExecutorService executorService) { setExecutorService(executorService); http://git-wip-us.apache.org/repos/asf/camel/blob/23b48a8a/camel-core/src/main/java/org/apache/camel/model/OptimisticLockRetryPolicyDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/OptimisticLockRetryPolicyDefinition.java b/camel-core/src/main/java/org/apache/camel/model/OptimisticLockRetryPolicyDefinition.java new file mode 100644 index 0000000..87b449d --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/model/OptimisticLockRetryPolicyDefinition.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; + +/** + * Represents an XML <optimisticLockRetryPolicy/> element + * + * @version + */ +@XmlRootElement(name = "optimisticLockRetryPolicy") +@XmlAccessorType(XmlAccessType.FIELD) +public class OptimisticLockRetryPolicyDefinition { + @XmlAttribute + private Integer maximumRetries; + @XmlAttribute + private Long retryDelay; + @XmlAttribute + private Long maximumRetryDelay; + @XmlAttribute + private Boolean exponentialBackOff; + @XmlAttribute + private Boolean randomBackOff; + + public OptimisticLockRetryPolicyDefinition() { + } + + public OptimisticLockRetryPolicy createOptimisticLockRetryPolicy() { + OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy(); + if (maximumRetries != null) { + policy.setMaximumRetries(maximumRetries); + } + if (retryDelay != null) { + policy.setRetryDelay(retryDelay); + } + if (maximumRetryDelay != null) { + policy.setMaximumRetryDelay(maximumRetryDelay); + } + if (exponentialBackOff != null) { + policy.setExponentialBackOff(exponentialBackOff); + } + if (randomBackOff != null) { + policy.setRandomBackOff(randomBackOff); + } + return policy; + } + + public OptimisticLockRetryPolicyDefinition maximumRetries(int maximumRetries) { + setMaximumRetries(maximumRetries); + return this; + } + + public Integer getMaximumRetries() { + return maximumRetries; + } + + public void setMaximumRetries(Integer maximumRetries) { + this.maximumRetries = maximumRetries; + } + + public OptimisticLockRetryPolicyDefinition retryDelay(long retryDelay) { + setRetryDelay(retryDelay); + return this; + } + + public Long getRetryDelay() { + return retryDelay; + } + + public void setRetryDelay(Long retryDelay) { + this.retryDelay = retryDelay; + } + + public OptimisticLockRetryPolicyDefinition maximumRetryDelay(long maximumRetryDelay) { + setMaximumRetryDelay(maximumRetryDelay); + return this; + } + + public Long getMaximumRetryDelay() { + return maximumRetryDelay; + } + + public void setMaximumRetryDelay(Long maximumRetryDelay) { + this.maximumRetryDelay = maximumRetryDelay; + } + + public OptimisticLockRetryPolicyDefinition exponentialBackOff() { + return exponentialBackOff(true); + } + + public OptimisticLockRetryPolicyDefinition exponentialBackOff(boolean exponentialBackOff) { + setExponentialBackOff(exponentialBackOff); + return this; + } + + public Boolean getExponentialBackOff() { + return exponentialBackOff; + } + + public void setExponentialBackOff(Boolean exponentialBackOff) { + this.exponentialBackOff = exponentialBackOff; + } + + public OptimisticLockRetryPolicyDefinition randomBackOff() { + return randomBackOff(true); + } + + public OptimisticLockRetryPolicyDefinition randomBackOff(boolean randomBackOff) { + setRandomBackOff(randomBackOff); + return this; + } + + public Boolean getRandomBackOff() { + return randomBackOff; + } + + public void setRandomBackOff(Boolean randomBackOff) { + this.randomBackOff = randomBackOff; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/23b48a8a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index c113c60..46ef290 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -90,6 +89,7 @@ public class AggregateProcessor extends ServiceSupport implements Processor, Nav private final Expression correlationExpression; private final ExecutorService executorService; private final boolean shutdownExecutorService; + private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy(); private ScheduledExecutorService timeoutCheckerExecutorService; private boolean shutdownTimeoutCheckerExecutorService; private ScheduledExecutorService recoverService; @@ -194,24 +194,27 @@ public class AggregateProcessor extends ServiceSupport implements Processor, Nav // when optimist locking is enabled we keep trying until we succeed if (optimisticLocking) { - boolean done = false; + boolean exhaustedRetries = true; int attempt = 0; - while (!done) { + do { attempt++; // copy exchange, and do not share the unit of work // the aggregated output runs in another unit of work Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); try { doAggregation(key, copy); - done = true; + exhaustedRetries = false; + break; } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}", new Object[]{attempt, aggregationRepository, key, copy, e}); + optimisticLockRetryPolicy.doDelay(attempt); } - // use a little random delay to avoid being to aggressive when retrying, and avoid potential clashing - int ran = new Random().nextInt(1000); - LOG.trace("Sleeping for {} millis before attempting again", ran); - Thread.sleep(ran); + } while (optimisticLockRetryPolicy.shouldRetry(attempt)); + + if (exhaustedRetries) { + throw new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + attempt + " times", exchange, + new OptimisticLockingAggregationRepository.OptimisticLockingException()); } } else { // copy exchange, and do not share the unit of work @@ -689,6 +692,14 @@ public class AggregateProcessor extends ServiceSupport implements Processor, Nav this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService; } + public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { + this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; + } + + public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { + return optimisticLockRetryPolicy; + } + /** * On completion task which keeps the booking of the in progress up to date */ http://git-wip-us.apache.org/repos/asf/camel/blob/23b48a8a/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockRetryPolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockRetryPolicy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockRetryPolicy.java new file mode 100644 index 0000000..3ca3081 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockRetryPolicy.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import java.util.Random; + +/** + * Class to control how failed optimistic locks are tried. This policy supports random and exponential back-off delays. + * <p/> + * If {@code randomBackOff} is enabled and a value is supplied for {@code retryDelay} the value will be ignored. + * <p/> + * If {@code randomBackOff} is enabled and no value is set for {@code maximumRetryDelay}, a default value of 1000ms will + * be used, the random delay will be between 0 and 1000 milliseconds. + * <p/> + * If both {@code randomBackOff} and {@code exponentialBackOff} are enabled, {@code exponentialBackOff} will take precedence. + * <p/> + * If {@code exponentialBackOff} is enabled and a value is set for {@code maximumRetryDelay}, the retry delay will keep + * doubling in value until it reaches or exceeds {@code maximumRetryDelay}. After it has reached or exceeded {@code maximumRetryDelay} + * the value of {@code maximumRetryDelay} will be used as the retry delay. + * <p/> + * If both {@code exponentialBackOff} and {@code randomBackOff} are disabled, the value of {@code retryDelay} will be used + * as the retry delay and remain constant through all the retry attempts. + * <p/> + * If the value of {@code maximumRetries} is set above zero, retry attempts will stop at the value specified. + * <p/> + * The default behaviour of this policy is to retry forever and exponentially increase the back-off delay starting with 50ms. + * + * @version + */ +public class OptimisticLockRetryPolicy { + private static final long DEFAULT_MAXIMUM_RETRY_DELAY = 1000L; + + private int maximumRetries; + private long retryDelay = 50L; + private long maximumRetryDelay; + private boolean exponentialBackOff = true; + private boolean randomBackOff; + + public OptimisticLockRetryPolicy() { + } + + public boolean shouldRetry(final int retryCounter) { + return maximumRetries <= 0 || retryCounter < maximumRetries; + } + + public void doDelay(final int retryCounter) throws InterruptedException { + if (retryDelay > 0 || randomBackOff) { + long sleepFor; + sleepFor = exponentialBackOff ? (retryDelay << retryCounter) + : (randomBackOff ? new Random().nextInt((int)(maximumRetryDelay > 0 ? maximumRetryDelay : DEFAULT_MAXIMUM_RETRY_DELAY)) : retryDelay); + if (maximumRetryDelay > 0 && sleepFor > maximumRetryDelay) { + sleepFor = maximumRetryDelay; + } + Thread.sleep(sleepFor); + } + } + + public int getMaximumRetries() { + return maximumRetries; + } + + public void setMaximumRetries(int maximumRetries) { + this.maximumRetries = maximumRetries; + } + + public OptimisticLockRetryPolicy maximumRetries(int maximumRetries) { + setMaximumRetries(maximumRetries); + return this; + } + + public long getRetryDelay() { + return retryDelay; + } + + public void setRetryDelay(long retryDelay) { + this.retryDelay = retryDelay; + } + + public OptimisticLockRetryPolicy retryDelay(long retryDelay) { + setRetryDelay(retryDelay); + return this; + } + + public long getMaximumRetryDelay() { + return maximumRetryDelay; + } + + public void setMaximumRetryDelay(long maximumRetryDelay) { + this.maximumRetryDelay = maximumRetryDelay; + } + + public OptimisticLockRetryPolicy maximumRetryDelay(long maximumRetryDelay) { + setMaximumRetryDelay(maximumRetryDelay); + return this; + } + + public boolean isExponentialBackOff() { + return exponentialBackOff; + } + + public void setExponentialBackOff(boolean exponentialBackOff) { + this.exponentialBackOff = exponentialBackOff; + } + + public OptimisticLockRetryPolicy exponentialBackOff() { + setExponentialBackOff(true); + return this; + } + + public boolean isRandomBackOff() { + return randomBackOff; + } + + public void setRandomBackOff(boolean randomBackOff) { + this.randomBackOff = randomBackOff; + } + + public OptimisticLockRetryPolicy randomBackOff() { + setRandomBackOff(true); + return this; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("OptimisticLockRetryPolicy["); + sb.append("maximumRetries=").append(maximumRetries); + sb.append(", retryDelay=").append(retryDelay); + sb.append(", maximumRetryDelay=").append(maximumRetryDelay); + sb.append(", exponentialBackOff=").append(exponentialBackOff); + sb.append(", randomBackOff=").append(randomBackOff); + sb.append(']'); + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/23b48a8a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index ---------------------------------------------------------------------- diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index index a877af7..1e32f42 100644 --- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index +++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index @@ -44,6 +44,7 @@ MarshalDefinition MulticastDefinition OnCompletionDefinition OnExceptionDefinition +OptimisticLockRetryPolicyDefinition OptionalIdentifiedDefinition OtherwiseDefinition PackageScanDefinition http://git-wip-us.apache.org/repos/asf/camel/blob/23b48a8a/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java index 4d089a7..44caad2 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java @@ -27,7 +27,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.BodyInAggregatingStrategy; import org.apache.camel.processor.aggregate.MemoryAggregationRepository; -import org.junit.Test; /** * @version @@ -39,7 +38,6 @@ public class DistributedConcurrentPerCorrelationKeyTest extends AbstractDistribu private int size = 200; private final String uri = "direct:start"; - @Test public void testAggregateConcurrentPerCorrelationKey() throws Exception { ExecutorService service = Executors.newFixedThreadPool(50); List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(); http://git-wip-us.apache.org/repos/asf/camel/blob/23b48a8a/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedOptimisticLockFailingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedOptimisticLockFailingTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedOptimisticLockFailingTest.java new file mode 100644 index 0000000..810eb18 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedOptimisticLockFailingTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExchangeException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.BodyInAggregatingStrategy; +import org.apache.camel.processor.aggregate.MemoryAggregationRepository; +import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; +import org.apache.camel.spi.OptimisticLockingAggregationRepository; + +/** + * @version + */ +public class DistributedOptimisticLockFailingTest extends AbstractDistributedTest { + + private static final class AlwaysFailingRepository extends MemoryAggregationRepository { + @Override + public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) { + throw new OptimisticLockingException(); + } + } + + private static final class EverySecondOneFailsRepository extends MemoryAggregationRepository { + private AtomicInteger counter = new AtomicInteger(); + private EverySecondOneFailsRepository() { + super(true); + } + @Override + public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) { + int count = counter.incrementAndGet(); + if (count % 2 == 0) { + throw new OptimisticLockingException(); + } else { + return super.add(camelContext, key, oldExchange, newExchange); + } + } + } + private EverySecondOneFailsRepository sharedRepository = new EverySecondOneFailsRepository(); + + public void testAlwaysFails() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + MockEndpoint mock2 = getMockEndpoint2("mock:result"); + mock2.expectedMessageCount(0); + + try { + template.sendBodyAndHeader("direct:fails", "hello world", "id", 1); + fail("Should throw CamelExecutionException"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(CamelExchangeException.class, e.getCause()); + assertIsInstanceOf(OptimisticLockingAggregationRepository.OptimisticLockingException.class, e.getCause().getCause()); + } + + try { + template2.sendBodyAndHeader("direct:fails", "hello world", "id", 1); + fail("Should throw CamelExecutionException"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(CamelExchangeException.class, e.getCause()); + assertIsInstanceOf(OptimisticLockingAggregationRepository.OptimisticLockingException.class, e.getCause().getCause()); + } + + mock.assertIsSatisfied(); + mock2.assertIsSatisfied(); + } + + public void testEverySecondOneFails() throws Exception { + int size = 200; + ExecutorService service = Executors.newFixedThreadPool(50); + List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(); + for (int i = 0; i < size; i++) { + final int id = i % 25; + final int choice = i % 2; + final int count = i; + tasks.add(new Callable<Object>() { + public Object call() throws Exception { + if (choice == 0) { + template.sendBodyAndHeader("direct:everysecondone", "" + count, "id", id); + } else { + template2.sendBodyAndHeader("direct:everysecondone", "" + count, "id", id); + } + return null; + } + }); + } + + MockEndpoint mock = getMockEndpoint("mock:result"); + MockEndpoint mock2 = getMockEndpoint2("mock:result"); + + // submit all tasks + service.invokeAll(tasks); + service.shutdown(); + service.awaitTermination(10, TimeUnit.SECONDS); + + int contextCount = mock.getReceivedCounter(); + int context2Count = mock2.getReceivedCounter(); + + assertEquals(25, contextCount + context2Count); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:fails") + .aggregate(header("id"), new BodyInAggregatingStrategy()) + .aggregationRepository(new AlwaysFailingRepository()) + .optimisticLocking() + .optimisticLockRetryPolicy(new OptimisticLockRetryPolicy().maximumRetries(5)) + .completionSize(2) + .to("mock:result"); + + from("direct:everysecondone") + .aggregate(header("id"), new BodyInAggregatingStrategy()) + .aggregationRepository(sharedRepository) + .optimisticLocking() + .completionSize(8) + .to("mock:result"); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/23b48a8a/camel-core/src/test/java/org/apache/camel/processor/aggregator/OptimisticLockRetryPolicyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/OptimisticLockRetryPolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/OptimisticLockRetryPolicyTest.java new file mode 100644 index 0000000..cbea08d --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/OptimisticLockRetryPolicyTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import junit.framework.TestCase; +import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; + +/** + * @version + */ +public class OptimisticLockRetryPolicyTest extends TestCase { + + private static long precision = 5L; // give or take 5ms + + public void testRandomBackOff() throws Exception { + OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy(); + policy.setRandomBackOff(true); + policy.setExponentialBackOff(false); + policy.setMaximumRetryDelay(500L); + + for (int i = 0; i < 10; i++) { + long start = System.currentTimeMillis(); + policy.doDelay(i); + long elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed <= policy.getMaximumRetryDelay() + precision && elapsed >= 0); + } + } + + public void testExponentialBackOff() throws Exception { + OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy(); + policy.setRandomBackOff(false); + policy.setExponentialBackOff(true); + policy.setMaximumRetryDelay(0L); + policy.setRetryDelay(50L); + + for (int i = 0; i < 10; i++) { + long start = System.currentTimeMillis(); + policy.doDelay(i); + long elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed >= (policy.getRetryDelay() << i) - precision); + assertTrue(elapsed <= (policy.getRetryDelay() << i) + precision); + } + } + + public void testExponentialBackOffMaximumRetryDelay() throws Exception { + OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy(); + policy.setRandomBackOff(false); + policy.setExponentialBackOff(true); + policy.setMaximumRetryDelay(200L); + policy.setRetryDelay(50L); + + for (int i = 0; i < 10; i++) { + long start = System.currentTimeMillis(); + policy.doDelay(i); + long elapsed = System.currentTimeMillis() - start; + switch (i) { + case 0: + assertTrue(elapsed <= 50 + precision); + assertTrue(elapsed >= 50 - precision); + break; + case 1: + assertTrue(elapsed <= 100 + precision); + assertTrue(elapsed >= 100 - precision); + break; + default: + assertTrue(elapsed <= 200 + precision); + assertTrue(elapsed >= 200 - precision); + break; + } + } + } + + public void testRetryDelay() throws Exception { + OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy(); + policy.setRandomBackOff(false); + policy.setExponentialBackOff(false); + policy.setMaximumRetryDelay(0L); + policy.setRetryDelay(50L); + + for (int i = 0; i < 10; i++) { + long start = System.currentTimeMillis(); + policy.doDelay(i); + long elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed <= policy.getRetryDelay() + precision); + assertTrue(elapsed >= policy.getRetryDelay() - precision); + } + } + + public void testMaximumRetries() throws Exception { + OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy(); + policy.setRandomBackOff(false); + policy.setExponentialBackOff(false); + policy.setMaximumRetryDelay(0L); + policy.setMaximumRetries(2); + policy.setRetryDelay(50L); + + for (int i = 0; i < 10; i++) { + switch (i) { + case 0: + case 1: + assertTrue(policy.shouldRetry(i)); + break; + default: + assertFalse(policy.shouldRetry(i)); + } + } + } +} \ No newline at end of file