This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit e5f3a83c64d8cf32b74334d85a65d5eb82407636 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun May 3 22:45:52 2020 +0200 camel-core - Aggregate EIP should return true if processed synchronously. --- .../processor/aggregate/AggregateProcessor.java | 42 +++++++++++----------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index be34e22..638b9f5 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -317,16 +317,15 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat @Override public boolean process(Exchange exchange, AsyncCallback callback) { try { - doProcess(exchange, callback); + return doProcess(exchange, callback); } catch (Throwable e) { exchange.setException(e); - callback.done(false); + callback.done(true); + return true; } - return false; } - protected void doProcess(Exchange exchange, AsyncCallback callback) throws Exception { - + protected boolean doProcess(Exchange exchange, AsyncCallback callback) throws Exception { if (getStatistics().isStatisticsEnabled()) { totalIn.incrementAndGet(); } @@ -335,8 +334,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat if (isCompleteAllGroups(exchange)) { removeFlagCompleteAllGroups(exchange); forceCompletionOfAllGroups(); - callback.done(false); - return; + callback.done(true); + return true; } // compute correlation expression @@ -348,30 +347,29 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } else { exchange.setException(new CamelExchangeException("Invalid correlation key", exchange)); } - callback.done(false); - return; + callback.done(true); + return true; } // is the correlation key closed? if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) { exchange.setException(new ClosedCorrelationKeyException(key, exchange)); - callback.done(false); - return; + callback.done(true); + return true; } if (optimisticLocking) { - doInOptimisticLock(exchange, key, callback, 0); + return doInOptimisticLock(exchange, key, callback, 0, true); } else { - doProcess(exchange, key, callback); + return doProcess(exchange, key, callback, true); } } - protected void doInOptimisticLock(Exchange exchange, String key, AsyncCallback callback, int attempt) { + protected boolean doInOptimisticLock(Exchange exchange, String key, AsyncCallback callback, int attempt, boolean sync) { while (true) { attempt++; try { - doProcess(exchange, key, callback); - return; + return doProcess(exchange, key, callback, sync); } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to aggregate exchange: {}", attempt, aggregationRepository, exchange, e); @@ -379,19 +377,20 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat long delay = optimisticLockRetryPolicy.getDelay(attempt); if (delay > 0) { int nextAttempt = attempt; - getOptimisticLockingExecutorService().schedule(() -> doInOptimisticLock(exchange, key, callback, nextAttempt), delay, TimeUnit.MILLISECONDS); + getOptimisticLockingExecutorService().schedule(() -> doInOptimisticLock(exchange, key, callback, nextAttempt, false), delay, TimeUnit.MILLISECONDS); + return false; } } else { exchange.setException(new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + attempt + " times", exchange, new OptimisticLockingAggregationRepository.OptimisticLockingException())); - callback.done(false); - return; + callback.done(sync); + return sync; } } } } - protected void doProcess(Exchange exchange, String key, AsyncCallback callback) { + protected boolean doProcess(Exchange exchange, String key, AsyncCallback callback, boolean sync) { // copy exchange, and do not share the unit of work // the aggregated output runs in another unit of work Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); @@ -423,7 +422,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat forceCompletionOfAllGroups(); } - callback.done(false); + callback.done(sync); + return sync; } private Object removeFlagCompleteCurrentGroup(Exchange exchange) {