Author: davsclaus
Date: Mon Feb 18 17:17:29 2013
New Revision: 1447397

URL: http://svn.apache.org/r1447397
Log:
CAMEL-6042: Polished. Thanks to Aaron Whiteside for the patch.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java?rev=1447397&r1=1447396&r2=1447397&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
 Mon Feb 18 17:17:29 2013
@@ -22,14 +22,18 @@ import org.apache.camel.Exchange;
  * A specialized {@link AggregationStrategy} which gets a callback when the 
aggregated {@link Exchange} fails to add
  * in the {@link org.apache.camel.spi.OptimisticLockingAggregationRepository} 
because of
  * an {@link 
org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException}.
- *
+ * <p/>
  * Please note that when aggregating {@link Exchange}'s to be careful not to 
modify and return the {@code oldExchange}
- * from the {@code aggregate()} method. If you are using the default 
MemoryAggregationRepository this will mean you have
- * modified the value of an object already referenced/stored by the 
MemoryAggregationRepository. This makes it impossible
- * for optimistic locking to work correctly with the 
MemoryAggregationRepository.
- *
+ * from the {@link AggregationStrategy#aggregate(org.apache.camel.Exchange, 
org.apache.camel.Exchange)} method.
+ * If you are using the default MemoryAggregationRepository this will mean you 
have modified the value of an object
+ * already referenced/stored by the MemoryAggregationRepository. This makes it 
impossible for optimistic locking
+ * to work correctly with the MemoryAggregationRepository.
+ * <p/>
  * You should instead return either the new {@code newExchange} or a 
completely new instance of {@link Exchange}. This
- * is due to the nature of how the underlying {@link 
java.util.concurrent.ConcurrentHashMap} performs CAS operations on the value 
identity.
+ * is due to the nature of how the underlying {@link 
java.util.concurrent.ConcurrentHashMap} performs CAS operations
+ * on the value identity.
+ *
+ * @see java.util.concurrent.ConcurrentHashMap
  *
  * @version
  */

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java?rev=1447397&r1=1447396&r2=1447397&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
 Mon Feb 18 17:17:29 2013
@@ -26,6 +26,8 @@ import org.apache.camel.Exchange;
  * If the underlying implementation cannot perform optimistic locking, it 
should
  * not implement this interface.
  *
+ * @see org.apache.camel.processor.aggregate.MemoryAggregationRepository
+ *
  * @version
  */
 public interface OptimisticLockingAggregationRepository extends 
AggregationRepository {
@@ -40,8 +42,27 @@ public interface OptimisticLockingAggreg
     /**
      * Add the given {@link org.apache.camel.Exchange} under the correlation 
key.
      * <p/>
-     * Will perform optimistic locking to replace expected existing exchange 
with
-     * the new supplied exchange.
+     * Will perform optimistic locking to replace expected existing exchange 
with the new supplied exchange.
+     * <p/>
+     * If the {@code oldExchange} is null the underlying implementation is to 
assume this is the very first Exchange for the
+     * supplied correlation key. When the implementation comes to store to the 
Exchange if there is already an existing
+     * Exchange present for this correlation key the implementation should 
throw an OptimisticLockingException.
+     * <p/>
+     * If the {@code oldExchange} is not null the underlying implementation 
should use it to compare with the existing exchange
+     * when doing an atomic compare-and-set/swap operation.
+     * <p/>
+     * The implementation may achieve this by storing a version identifier in 
the Exchange as a parameter. Set before
+     * returning from {@link 
AggregationRepository#get(org.apache.camel.CamelContext, String)}} and 
retrieved from the
+     * exchange when passed to {@link 
AggregationRepository#add(org.apache.camel.CamelContext, String, 
org.apache.camel.Exchange)}.
+     * <p/>
+     * Note: The {@link 
org.apache.camel.processor.aggregate.MemoryAggregationRepository} is an 
exception to this recommendation.
+     * It uses the {@code oldExchange}'s Object identify to perform it's 
compare-and-set/swap operation, instead of a version
+     * parameter. This is not the recommended approach, and should be avoided.
+     * <p/>
+     * The {@link org.apache.camel.processor.aggregate.AggregateProcessor} 
will ensure that the exchange received from
+     * {@link 
OptimisticLockingAggregationRepository#get(org.apache.camel.CamelContext, 
String)} is passed as {@code oldExchange},
+     * and that the aggregated exchange received from the {@link 
org.apache.camel.processor.aggregate.AggregationStrategy}
+     * is passed as the {@code newExchange}.
      *
      * @param camelContext   the current CamelContext
      * @param key            the correlation key
@@ -53,7 +74,10 @@ public interface OptimisticLockingAggreg
     Exchange add(CamelContext camelContext, String key, Exchange oldExchange, 
Exchange newExchange) throws OptimisticLockingException;
 
     /**
-     *
+     * Removes the given Exchange when both the supplied key and Exchange are 
present in the repository. If the supplied Exchange
+     * does not match the Exchange actually stored against the key this method 
should throw an OptimisticLockingException
+     * to indicate that the value of the correlation key has changed from the 
expected value.
+     * <p/>
      * @param camelContext   the current CamelContext
      * @param key            the correlation key
      * @param exchange       the exchange to remove

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java?rev=1447397&r1=1447396&r2=1447397&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
 Mon Feb 18 17:17:29 2013
@@ -19,6 +19,7 @@ package org.apache.camel.processor.aggre
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.util.ServiceHelper;
@@ -33,13 +34,15 @@ public abstract class AbstractDistribute
 
     public void setUp() throws Exception {
         super.setUp();
+        context.setUseMDCLogging(true);
 
         context2 = new DefaultCamelContext();
+        context2.setUseMDCLogging(true);
         template2 = context2.createProducerTemplate();
         ServiceHelper.startServices(template2, context2);
 
         // add routes after CamelContext has been started
-        context2.addRoutes(createRouteBuilder());
+        context2.addRoutes(createRouteBuilder2());
     }
 
     public void tearDown() throws Exception {
@@ -51,4 +54,8 @@ public abstract class AbstractDistribute
     protected MockEndpoint getMockEndpoint2(String uri) {
         return context2.getEndpoint(uri, MockEndpoint.class);
     }
+
+    protected RouteBuilder createRouteBuilder2() throws Exception {
+        return createRouteBuilder();
+    }
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java?rev=1447397&r1=1447396&r2=1447397&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
 Mon Feb 18 17:17:29 2013
@@ -18,6 +18,7 @@ package org.apache.camel.processor.aggre
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
@@ -27,16 +28,37 @@ import org.apache.camel.processor.aggreg
  */
 public class DistributedCompletionIntervalTest extends AbstractDistributedTest 
{
 
-    public void testAggregateInterval() throws Exception {
+    private MemoryAggregationRepository sharedAggregationRepository = new 
MemoryAggregationRepository(true);
+
+    public void testCamelContext1Wins() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Message 19");
+        MockEndpoint mock2 = getMockEndpoint2("mock:result");
+        mock2.expectedMessageCount(0);
+
+        // ensure messages are send after the 1s
+        Thread.sleep(2000);
+        sendMessages();
+
+        mock.assertIsSatisfied();
+        mock2.assertIsSatisfied();
+    }
+
+    public void testCamelContext2Wins() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
         MockEndpoint mock2 = getMockEndpoint2("mock:result");
-        // by default the use latest aggregation strategy is used so we get 
message 18 and message 19
-        mock.expectedBodiesReceived("Message 18");
         mock2.expectedBodiesReceived("Message 19");
 
         // ensure messages are send after the 1s
         Thread.sleep(2000);
-        
+        sendMessages();
+
+        mock2.assertIsSatisfied();
+        mock.assertIsSatisfied();
+    }
+
+    private void sendMessages() {
         for (int i = 0; i < 20; i++) {
             int choice = i % 2;
             if (choice == 0) {
@@ -45,9 +67,6 @@ public class DistributedCompletionInterv
                 template2.sendBodyAndHeader("direct:start", "Message " + i, 
"id", "1");
             }
         }
-
-        mock.assertIsSatisfied();
-        mock2.assertIsSatisfied();
     }
 
     @Override
@@ -58,8 +77,28 @@ public class DistributedCompletionInterv
                 // START SNIPPET: e1
                 from("direct:start")
                     .aggregate(header("id"), new 
UseLatestAggregationStrategy())
+                        .aggregationRepository(sharedAggregationRepository)
+                        .optimisticLocking()
+                        // trigger completion every 5th second
+                        
.completionInterval(getName().equals("testCamelContext1Wins") ? 5000 : 10000)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder2() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    .aggregate(header("id"), new 
UseLatestAggregationStrategy())
+                        .aggregationRepository(sharedAggregationRepository)
+                        .optimisticLocking()
                         // trigger completion every 5th second
-                        .completionInterval(5000)
+                        
.completionInterval(getName().equals("testCamelContext1Wins") ? 10000 : 5000)
                     .to("mock:result");
                 // END SNIPPET: e1
             }


Reply via email to