This is an automated email from the ASF dual-hosted git repository.

klease pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new fd98cd3d50d CAMEL-18275: Address issue of completions not being run in 
SEDA pipeline (#8015)
fd98cd3d50d is described below

commit fd98cd3d50d1c72f79457ef50d96a27ec8e5e5be
Author: klease <38634989+kle...@users.noreply.github.com>
AuthorDate: Wed Aug 3 15:40:18 2022 +0200

    CAMEL-18275: Address issue of completions not being run in SEDA pipeline 
(#8015)
    
    When synchronizations are handed over, add a method to allow some 
housekeeping
    to be performed. In the 
OnCompletionProcessor.OnCompletionSynchronizationAfterConsumer
    this method stores the routeId on the Exchange.
---
 .../apache/camel/spi/SynchronizationVetoable.java    |  9 +++++++++
 .../apache/camel/impl/engine/DefaultUnitOfWork.java  |  7 ++++++-
 .../camel/processor/OnCompletionProcessor.java       | 20 ++++++++++++++++++++
 .../apache/camel/support/SynchronizationAdapter.java |  5 +++++
 4 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
index 167ae544037..87270e87d3c 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.Exchange;
+
 /**
  * A vetoable {@link org.apache.camel.spi.Synchronization}.
  * <p/>
@@ -40,4 +42,11 @@ public interface SynchronizationVetoable extends 
Synchronization {
      * @return <tt>true</tt> to allow handover, <tt>false</tt> to deny.
      */
     boolean allowHandover();
+
+    /**
+     * A method to perform optional housekeeping when a Synchronization is 
being handed over.
+     * 
+     * @param target The Exchange to which the synchronizations are being 
transferred.
+     */
+    void beforeHandover(Exchange target);
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index fa8c5c75f19..9580c258a6f 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -210,14 +210,19 @@ public class DefaultUnitOfWork implements UnitOfWork {
             Synchronization synchronization = it.next();
 
             boolean handover = true;
+            SynchronizationVetoable veto = null;
             if (synchronization instanceof SynchronizationVetoable) {
-                SynchronizationVetoable veto = (SynchronizationVetoable) 
synchronization;
+                veto = (SynchronizationVetoable) synchronization;
                 handover = veto.allowHandover();
             }
 
             if (handover && (filter == null || filter.test(synchronization))) {
                 log.trace("Handover synchronization {} to: {}", 
synchronization, target);
                 
target.adapt(ExtendedExchange.class).addOnCompletion(synchronization);
+                // Allow the synchronization to do housekeeping before transfer
+                if (veto != null) {
+                    veto.beforeHandover(target);
+                }
                 // remove it if its handed over
                 it.remove();
             } else {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 8261aa451ec..8607957db2e 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -368,6 +368,26 @@ public class OnCompletionProcessor extends 
AsyncProcessorSupport implements Trac
                 return "onFailureOnly";
             }
         }
+
+        @Override
+        public void beforeHandover(Exchange target) {
+            // The onAfterRoute method will not be called after the handover
+            // To ensure that completions are called, remember the route IDs 
here.
+            // Assumption: the fromRouteId on the target Exchange is the route
+            // which owns the completion
+            LOG.debug("beforeHandover from Route {}", target.getFromRouteId());
+            final String exchangeRouteId = target.getFromRouteId();
+            if (routeScoped && exchangeRouteId != null && 
exchangeRouteId.equals(routeId)) {
+                List<String> routeIds = 
target.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class);
+                if (routeIds == null) {
+                    routeIds = new ArrayList<>();
+                    
target.setProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, routeIds);
+                }
+                if (!routeIds.contains(exchangeRouteId)) {
+                    routeIds.add(exchangeRouteId);
+                }
+            }
+        }
     }
 
     private final class OnCompletionSynchronizationBeforeConsumer extends 
SynchronizationAdapter implements Ordered {
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java
index 6852686017f..7d9ce2c67f0 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java
@@ -63,4 +63,9 @@ public class SynchronizationAdapter implements 
SynchronizationVetoable, Ordered,
     public void onAfterRoute(Route route, Exchange exchange) {
         // noop
     }
+
+    @Override
+    public void beforeHandover(Exchange target) {
+        // noop
+    }
 }

Reply via email to