This is an automated email from the ASF dual-hosted git repository. klease pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new fd98cd3d50d CAMEL-18275: Address issue of completions not being run in SEDA pipeline (#8015) fd98cd3d50d is described below commit fd98cd3d50d1c72f79457ef50d96a27ec8e5e5be Author: klease <38634989+kle...@users.noreply.github.com> AuthorDate: Wed Aug 3 15:40:18 2022 +0200 CAMEL-18275: Address issue of completions not being run in SEDA pipeline (#8015) When synchronizations are handed over, add a method to allow some housekeeping to be performed. In the OnCompletionProcessor.OnCompletionSynchronizationAfterConsumer this method stores the routeId on the Exchange. --- .../apache/camel/spi/SynchronizationVetoable.java | 9 +++++++++ .../apache/camel/impl/engine/DefaultUnitOfWork.java | 7 ++++++- .../camel/processor/OnCompletionProcessor.java | 20 ++++++++++++++++++++ .../apache/camel/support/SynchronizationAdapter.java | 5 +++++ 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java b/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java index 167ae544037..87270e87d3c 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java @@ -16,6 +16,8 @@ */ package org.apache.camel.spi; +import org.apache.camel.Exchange; + /** * A vetoable {@link org.apache.camel.spi.Synchronization}. * <p/> @@ -40,4 +42,11 @@ public interface SynchronizationVetoable extends Synchronization { * @return <tt>true</tt> to allow handover, <tt>false</tt> to deny. */ boolean allowHandover(); + + /** + * A method to perform optional housekeeping when a Synchronization is being handed over. + * + * @param target The Exchange to which the synchronizations are being transferred. + */ + void beforeHandover(Exchange target); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index fa8c5c75f19..9580c258a6f 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -210,14 +210,19 @@ public class DefaultUnitOfWork implements UnitOfWork { Synchronization synchronization = it.next(); boolean handover = true; + SynchronizationVetoable veto = null; if (synchronization instanceof SynchronizationVetoable) { - SynchronizationVetoable veto = (SynchronizationVetoable) synchronization; + veto = (SynchronizationVetoable) synchronization; handover = veto.allowHandover(); } if (handover && (filter == null || filter.test(synchronization))) { log.trace("Handover synchronization {} to: {}", synchronization, target); target.adapt(ExtendedExchange.class).addOnCompletion(synchronization); + // Allow the synchronization to do housekeeping before transfer + if (veto != null) { + veto.beforeHandover(target); + } // remove it if its handed over it.remove(); } else { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index 8261aa451ec..8607957db2e 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -368,6 +368,26 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac return "onFailureOnly"; } } + + @Override + public void beforeHandover(Exchange target) { + // The onAfterRoute method will not be called after the handover + // To ensure that completions are called, remember the route IDs here. + // Assumption: the fromRouteId on the target Exchange is the route + // which owns the completion + LOG.debug("beforeHandover from Route {}", target.getFromRouteId()); + final String exchangeRouteId = target.getFromRouteId(); + if (routeScoped && exchangeRouteId != null && exchangeRouteId.equals(routeId)) { + List<String> routeIds = target.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class); + if (routeIds == null) { + routeIds = new ArrayList<>(); + target.setProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, routeIds); + } + if (!routeIds.contains(exchangeRouteId)) { + routeIds.add(exchangeRouteId); + } + } + } } private final class OnCompletionSynchronizationBeforeConsumer extends SynchronizationAdapter implements Ordered { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java b/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java index 6852686017f..7d9ce2c67f0 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java @@ -63,4 +63,9 @@ public class SynchronizationAdapter implements SynchronizationVetoable, Ordered, public void onAfterRoute(Route route, Exchange exchange) { // noop } + + @Override + public void beforeHandover(Exchange target) { + // noop + } }