Author: davsclaus
Date: Fri Mar  1 09:49:17 2013
New Revision: 1451547

URL: http://svn.apache.org/r1451547
Log:
CAMEL-6097: Fixed race condition in aggregate eip when doing recovery task.

Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1451544
  Merged /camel/branches/camel-2.10.x:r1451546

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1451547&r1=1451546&r2=1451547&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Fri Mar  1 09:49:17 2013
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -744,6 +745,9 @@ public class AggregateProcessor extends 
 
             LOG.trace("Starting recover check");
 
+            // copy the current in progress before doing scan
+            final Set<String> copyOfInProgress = new 
LinkedHashSet<String>(inProgressCompleteExchanges);
+
             Set<String> exchangeIds = recoverable.scan(camelContext);
             for (String exchangeId : exchangeIds) {
 
@@ -753,7 +757,9 @@ public class AggregateProcessor extends 
                     return;
                 }
 
-                boolean inProgress = 
inProgressCompleteExchanges.contains(exchangeId);
+                // consider in progress if it was in progress before we did 
the scan, or currently after we did the scan
+                // its safer to consider it in progress than risk duplicates 
due both in progress + recovered
+                boolean inProgress = copyOfInProgress.contains(exchangeId) || 
inProgressCompleteExchanges.contains(exchangeId);
                 if (inProgress) {
                     LOG.trace("Aggregated exchange with id: {} is already in 
progress.", exchangeId);
                 } else {


Reply via email to