Author: davsclaus Date: Fri Mar 1 09:49:17 2013 New Revision: 1451547 URL: http://svn.apache.org/r1451547 Log: CAMEL-6097: Fixed race condition in aggregate eip when doing recovery task.
Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1451544 Merged /camel/branches/camel-2.10.x:r1451546 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1451547&r1=1451546&r2=1451547&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Mar 1 09:49:17 2013 @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -744,6 +745,9 @@ public class AggregateProcessor extends LOG.trace("Starting recover check"); + // copy the current in progress before doing scan + final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges); + Set<String> exchangeIds = recoverable.scan(camelContext); for (String exchangeId : exchangeIds) { @@ -753,7 +757,9 @@ public class AggregateProcessor extends return; } - boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); + // consider in progress if it was in progress before we did the scan, or currently after we did the scan + // its safer to consider it in progress than risk duplicates due both in progress + recovered + boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId); if (inProgress) { LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); } else {