This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e5f3a83c64d8cf32b74334d85a65d5eb82407636
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun May 3 22:45:52 2020 +0200

    camel-core - Aggregate EIP should return true if processed synchronously.
---
 .../processor/aggregate/AggregateProcessor.java    | 42 +++++++++++-----------
 1 file changed, 21 insertions(+), 21 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index be34e22..638b9f5 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -317,16 +317,15 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            doProcess(exchange, callback);
+            return doProcess(exchange, callback);
         } catch (Throwable e) {
             exchange.setException(e);
-            callback.done(false);
+            callback.done(true);
+            return true;
         }
-        return false;
     }
 
-    protected void doProcess(Exchange exchange, AsyncCallback callback) throws 
Exception {
-
+    protected boolean doProcess(Exchange exchange, AsyncCallback callback) 
throws Exception {
         if (getStatistics().isStatisticsEnabled()) {
             totalIn.incrementAndGet();
         }
@@ -335,8 +334,8 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
         if (isCompleteAllGroups(exchange)) {
             removeFlagCompleteAllGroups(exchange);
             forceCompletionOfAllGroups();
-            callback.done(false);
-            return;
+            callback.done(true);
+            return true;
         }
 
         // compute correlation expression
@@ -348,30 +347,29 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
             } else {
                 exchange.setException(new CamelExchangeException("Invalid 
correlation key", exchange));
             }
-            callback.done(false);
-            return;
+            callback.done(true);
+            return true;
         }
 
         // is the correlation key closed?
         if (closedCorrelationKeys != null && 
closedCorrelationKeys.containsKey(key)) {
             exchange.setException(new ClosedCorrelationKeyException(key, 
exchange));
-            callback.done(false);
-            return;
+            callback.done(true);
+            return true;
         }
 
         if (optimisticLocking) {
-            doInOptimisticLock(exchange, key, callback, 0);
+            return doInOptimisticLock(exchange, key, callback, 0, true);
         } else {
-            doProcess(exchange, key, callback);
+            return doProcess(exchange, key, callback, true);
         }
     }
 
-    protected void doInOptimisticLock(Exchange exchange, String key, 
AsyncCallback callback, int attempt) {
+    protected boolean doInOptimisticLock(Exchange exchange, String key, 
AsyncCallback callback, int attempt, boolean sync) {
         while (true) {
             attempt++;
             try {
-                doProcess(exchange, key, callback);
-                return;
+                return doProcess(exchange, key, callback, sync);
             } catch 
(OptimisticLockingAggregationRepository.OptimisticLockingException e) {
                 LOG.trace("On attempt {} 
OptimisticLockingAggregationRepository: {} threw OptimisticLockingException 
while trying to aggregate exchange: {}",
                         attempt, aggregationRepository, exchange, e);
@@ -379,19 +377,20 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
                     long delay = optimisticLockRetryPolicy.getDelay(attempt);
                     if (delay > 0) {
                         int nextAttempt = attempt;
-                        getOptimisticLockingExecutorService().schedule(() -> 
doInOptimisticLock(exchange, key, callback, nextAttempt), delay, 
TimeUnit.MILLISECONDS);
+                        getOptimisticLockingExecutorService().schedule(() -> 
doInOptimisticLock(exchange, key, callback, nextAttempt, false), delay, 
TimeUnit.MILLISECONDS);
+                        return false;
                     }
                 } else {
                     exchange.setException(new 
CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + 
attempt + " times", exchange,
                             new 
OptimisticLockingAggregationRepository.OptimisticLockingException()));
-                    callback.done(false);
-                    return;
+                    callback.done(sync);
+                    return sync;
                 }
             }
         }
     }
 
-    protected void doProcess(Exchange exchange, String key, AsyncCallback 
callback) {
+    protected boolean doProcess(Exchange exchange, String key, AsyncCallback 
callback, boolean sync) {
         // copy exchange, and do not share the unit of work
         // the aggregated output runs in another unit of work
         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
@@ -423,7 +422,8 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
             forceCompletionOfAllGroups();
         }
 
-        callback.done(false);
+        callback.done(sync);
+        return sync;
     }
 
     private Object removeFlagCompleteCurrentGroup(Exchange exchange) {

Reply via email to