Author: davsclaus Date: Mon Feb 18 17:17:29 2013 New Revision: 1447397 URL: http://svn.apache.org/r1447397 Log: CAMEL-6042: Polished. Thanks to Aaron Whiteside for the patch.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java?rev=1447397&r1=1447396&r2=1447397&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java Mon Feb 18 17:17:29 2013 @@ -22,14 +22,18 @@ import org.apache.camel.Exchange; * A specialized {@link AggregationStrategy} which gets a callback when the aggregated {@link Exchange} fails to add * in the {@link org.apache.camel.spi.OptimisticLockingAggregationRepository} because of * an {@link org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException}. - * + * <p/> * Please note that when aggregating {@link Exchange}'s to be careful not to modify and return the {@code oldExchange} - * from the {@code aggregate()} method. If you are using the default MemoryAggregationRepository this will mean you have - * modified the value of an object already referenced/stored by the MemoryAggregationRepository. This makes it impossible - * for optimistic locking to work correctly with the MemoryAggregationRepository. - * + * from the {@link AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} method. + * If you are using the default MemoryAggregationRepository this will mean you have modified the value of an object + * already referenced/stored by the MemoryAggregationRepository. This makes it impossible for optimistic locking + * to work correctly with the MemoryAggregationRepository. + * <p/> * You should instead return either the new {@code newExchange} or a completely new instance of {@link Exchange}. This - * is due to the nature of how the underlying {@link java.util.concurrent.ConcurrentHashMap} performs CAS operations on the value identity. + * is due to the nature of how the underlying {@link java.util.concurrent.ConcurrentHashMap} performs CAS operations + * on the value identity. + * + * @see java.util.concurrent.ConcurrentHashMap * * @version */ Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java?rev=1447397&r1=1447396&r2=1447397&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java Mon Feb 18 17:17:29 2013 @@ -26,6 +26,8 @@ import org.apache.camel.Exchange; * If the underlying implementation cannot perform optimistic locking, it should * not implement this interface. * + * @see org.apache.camel.processor.aggregate.MemoryAggregationRepository + * * @version */ public interface OptimisticLockingAggregationRepository extends AggregationRepository { @@ -40,8 +42,27 @@ public interface OptimisticLockingAggreg /** * Add the given {@link org.apache.camel.Exchange} under the correlation key. * <p/> - * Will perform optimistic locking to replace expected existing exchange with - * the new supplied exchange. + * Will perform optimistic locking to replace expected existing exchange with the new supplied exchange. + * <p/> + * If the {@code oldExchange} is null the underlying implementation is to assume this is the very first Exchange for the + * supplied correlation key. When the implementation comes to store to the Exchange if there is already an existing + * Exchange present for this correlation key the implementation should throw an OptimisticLockingException. + * <p/> + * If the {@code oldExchange} is not null the underlying implementation should use it to compare with the existing exchange + * when doing an atomic compare-and-set/swap operation. + * <p/> + * The implementation may achieve this by storing a version identifier in the Exchange as a parameter. Set before + * returning from {@link AggregationRepository#get(org.apache.camel.CamelContext, String)}} and retrieved from the + * exchange when passed to {@link AggregationRepository#add(org.apache.camel.CamelContext, String, org.apache.camel.Exchange)}. + * <p/> + * Note: The {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} is an exception to this recommendation. + * It uses the {@code oldExchange}'s Object identify to perform it's compare-and-set/swap operation, instead of a version + * parameter. This is not the recommended approach, and should be avoided. + * <p/> + * The {@link org.apache.camel.processor.aggregate.AggregateProcessor} will ensure that the exchange received from + * {@link OptimisticLockingAggregationRepository#get(org.apache.camel.CamelContext, String)} is passed as {@code oldExchange}, + * and that the aggregated exchange received from the {@link org.apache.camel.processor.aggregate.AggregationStrategy} + * is passed as the {@code newExchange}. * * @param camelContext the current CamelContext * @param key the correlation key @@ -53,7 +74,10 @@ public interface OptimisticLockingAggreg Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException; /** - * + * Removes the given Exchange when both the supplied key and Exchange are present in the repository. If the supplied Exchange + * does not match the Exchange actually stored against the key this method should throw an OptimisticLockingException + * to indicate that the value of the correlation key has changed from the expected value. + * <p/> * @param camelContext the current CamelContext * @param key the correlation key * @param exchange the exchange to remove Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java?rev=1447397&r1=1447396&r2=1447397&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java Mon Feb 18 17:17:29 2013 @@ -19,6 +19,7 @@ package org.apache.camel.processor.aggre import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.util.ServiceHelper; @@ -33,13 +34,15 @@ public abstract class AbstractDistribute public void setUp() throws Exception { super.setUp(); + context.setUseMDCLogging(true); context2 = new DefaultCamelContext(); + context2.setUseMDCLogging(true); template2 = context2.createProducerTemplate(); ServiceHelper.startServices(template2, context2); // add routes after CamelContext has been started - context2.addRoutes(createRouteBuilder()); + context2.addRoutes(createRouteBuilder2()); } public void tearDown() throws Exception { @@ -51,4 +54,8 @@ public abstract class AbstractDistribute protected MockEndpoint getMockEndpoint2(String uri) { return context2.getEndpoint(uri, MockEndpoint.class); } + + protected RouteBuilder createRouteBuilder2() throws Exception { + return createRouteBuilder(); + } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java?rev=1447397&r1=1447396&r2=1447397&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java Mon Feb 18 17:17:29 2013 @@ -18,6 +18,7 @@ package org.apache.camel.processor.aggre import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.MemoryAggregationRepository; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; /** @@ -27,16 +28,37 @@ import org.apache.camel.processor.aggreg */ public class DistributedCompletionIntervalTest extends AbstractDistributedTest { - public void testAggregateInterval() throws Exception { + private MemoryAggregationRepository sharedAggregationRepository = new MemoryAggregationRepository(true); + + public void testCamelContext1Wins() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Message 19"); + MockEndpoint mock2 = getMockEndpoint2("mock:result"); + mock2.expectedMessageCount(0); + + // ensure messages are send after the 1s + Thread.sleep(2000); + sendMessages(); + + mock.assertIsSatisfied(); + mock2.assertIsSatisfied(); + } + + public void testCamelContext2Wins() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); MockEndpoint mock2 = getMockEndpoint2("mock:result"); - // by default the use latest aggregation strategy is used so we get message 18 and message 19 - mock.expectedBodiesReceived("Message 18"); mock2.expectedBodiesReceived("Message 19"); // ensure messages are send after the 1s Thread.sleep(2000); - + sendMessages(); + + mock2.assertIsSatisfied(); + mock.assertIsSatisfied(); + } + + private void sendMessages() { for (int i = 0; i < 20; i++) { int choice = i % 2; if (choice == 0) { @@ -45,9 +67,6 @@ public class DistributedCompletionInterv template2.sendBodyAndHeader("direct:start", "Message " + i, "id", "1"); } } - - mock.assertIsSatisfied(); - mock2.assertIsSatisfied(); } @Override @@ -58,8 +77,28 @@ public class DistributedCompletionInterv // START SNIPPET: e1 from("direct:start") .aggregate(header("id"), new UseLatestAggregationStrategy()) + .aggregationRepository(sharedAggregationRepository) + .optimisticLocking() + // trigger completion every 5th second + .completionInterval(getName().equals("testCamelContext1Wins") ? 5000 : 10000) + .to("mock:result"); + // END SNIPPET: e1 + } + }; + } + + @Override + protected RouteBuilder createRouteBuilder2() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + .aggregate(header("id"), new UseLatestAggregationStrategy()) + .aggregationRepository(sharedAggregationRepository) + .optimisticLocking() // trigger completion every 5th second - .completionInterval(5000) + .completionInterval(getName().equals("testCamelContext1Wins") ? 10000 : 5000) .to("mock:result"); // END SNIPPET: e1 }