CAMEL-8010:Locking the critical section to avoid race condition if AggregateTimeOutChecker also completes at the same time as Recover task
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/655c771c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/655c771c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/655c771c Branch: refs/heads/master Commit: 655c771c330f2ce404b4515d3c649da7b8a22a35 Parents: 8163a8f Author: Rajithamol <[email protected]> Authored: Wed Aug 16 14:10:30 2017 -0400 Committer: Andrea Cosentino <[email protected]> Committed: Thu Aug 24 11:18:46 2017 +0200 ---------------------------------------------------------------------- .../processor/aggregate/AggregateProcessor.java | 113 ++++++++++--------- 1 file changed, 61 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/655c771c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 50978a0..6d2c5a3 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -1223,68 +1223,77 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor LOG.info("We are shutting down so stop recovering"); return; } + if(!optimisticLocking){ + lock.lock(); + } + try { + // consider in progress if it was in progress before we did the scan, or currently after we did the scan + // its safer to consider it in progress than risk duplicates due both in progress + recovered + boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId); + if (inProgress) { + LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); + } else { + LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId); + Exchange exchange = recoverable.recover(camelContext, exchangeId); + if (exchange != null) { + // get the correlation key + String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); + // and mark it as redelivered + exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); + + // get the current redelivery data + RedeliveryData data = redeliveryState.get(exchange.getExchangeId()); + + // if we are exhausted, then move to dead letter channel + if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) { + LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries() + + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri()); + + // send to DLC + try { + // set redelivery counter + exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); + exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); + deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange); + } catch (Throwable e) { + exchange.setException(e); + } - // consider in progress if it was in progress before we did the scan, or currently after we did the scan - // its safer to consider it in progress than risk duplicates due both in progress + recovered - boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId); - if (inProgress) { - LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); - } else { - LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId); - Exchange exchange = recoverable.recover(camelContext, exchangeId); - if (exchange != null) { - // get the correlation key - String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); - // and mark it as redelivered - exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); - - // get the current redelivery data - RedeliveryData data = redeliveryState.get(exchange.getExchangeId()); - - // if we are exhausted, then move to dead letter channel - if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) { - LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries() - + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri()); + // handle if failed + if (exchange.getException() != null) { + getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException()); + } else { + // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again + recoverable.confirm(camelContext, exchangeId); + } + } else { + // update current redelivery state + if (data == null) { + // create new data + data = new RedeliveryData(); + redeliveryState.put(exchange.getExchangeId(), data); + } + data.redeliveryCounter++; - // send to DLC - try { // set redelivery counter exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); - exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); - deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange); - } catch (Throwable e) { - exchange.setException(e); - } + if (recoverable.getMaximumRedeliveries() > 0) { + exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries()); + } - // handle if failed - if (exchange.getException() != null) { - getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException()); - } else { - // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again - recoverable.confirm(camelContext, exchangeId); - } - } else { - // update current redelivery state - if (data == null) { - // create new data - data = new RedeliveryData(); - redeliveryState.put(exchange.getExchangeId(), data); - } - data.redeliveryCounter++; + LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId); - // set redelivery counter - exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); - if (recoverable.getMaximumRedeliveries() > 0) { - exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries()); + // not exhaust so resubmit the recovered exchange + onSubmitCompletion(key, exchange); } - - LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId); - - // not exhaust so resubmit the recovered exchange - onSubmitCompletion(key, exchange); } } } + finally { + if(!optimisticLocking){ + lock.unlock(); + } + } } LOG.trace("Recover check complete");
