Author: davsclaus Date: Fri Mar 1 09:47:51 2013 New Revision: 1451544 URL: http://svn.apache.org/r1451544 Log: CAMEL-6097: Fixed race condition in aggregate eip when doing recovery task.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1451544&r1=1451543&r2=1451544&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Mar 1 09:47:51 2013 @@ -18,6 +18,7 @@ package org.apache.camel.processor.aggre import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -838,6 +839,9 @@ public class AggregateProcessor extends LOG.trace("Starting recover check"); + // copy the current in progress before doing scan + final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges); + Set<String> exchangeIds = recoverable.scan(camelContext); for (String exchangeId : exchangeIds) { @@ -847,7 +851,9 @@ public class AggregateProcessor extends return; } - boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); + // consider in progress if it was in progress before we did the scan, or currently after we did the scan + // its safer to consider it in progress than risk duplicates due both in progress + recovered + boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId); if (inProgress) { LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); } else {