Repository: camel Updated Branches: refs/heads/master 0f5b58161 -> 8c18fecf7
CAMEL-10195: rest-dsl - automatic binding failure with waitForTaskToComplete=Never Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c18fecf Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c18fecf Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c18fecf Branch: refs/heads/master Commit: 8c18fecf7156dca6d2dbd87908914f465836d9eb Parents: 0f5b581 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 3 17:18:30 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 3 17:18:30 2016 +0200 ---------------------------------------------------------------------- .../camel/component/seda/SedaProducer.java | 20 +++++--------------- .../apache/camel/impl/DefaultUnitOfWork.java | 8 +++++++- .../java/org/apache/camel/spi/UnitOfWork.java | 14 ++++++++++++++ .../org/apache/camel/util/ExchangeHelper.java | 19 ++++++++++++++++++- 4 files changed, 44 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8c18fecf/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java index 3e34e8a..03ce9f3 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -27,6 +28,7 @@ import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.spi.Synchronization; +import org.apache.camel.spi.SynchronizationVetoable; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.ExchangeHelper; @@ -180,23 +182,11 @@ public class SedaProducer extends DefaultAsyncProducer { protected Exchange prepareCopy(Exchange exchange, boolean handover) { // use a new copy of the exchange to route async (and use same message id) - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, true); + // if handover we need to do special handover to avoid handing over // RestBindingMarshalOnCompletion as it should not be handed over with SEDA - if (handover) { - List<Synchronization> completions = exchange.handoverCompletions(); - if (completions != null) { - for (Synchronization sync : completions) { - if (sync.getClass().getName().contains("RestBindingMarshalOnCompletion")) { - // keep this one - exchange.addOnCompletion(sync); - } else { - // handover - copy.addOnCompletion(sync); - } - } - } - } + Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true, + synchronization -> !synchronization.getClass().getName().contains("RestBindingMarshalOnCompletion")); // set a new from endpoint to be the seda queue copy.setFromEndpoint(endpoint); return copy; http://git-wip-us.apache.org/repos/asf/camel/blob/8c18fecf/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java index 3fbd252..ae46435 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java @@ -23,6 +23,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.Stack; +import java.util.function.Predicate; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -190,6 +191,11 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { } public void handoverSynchronization(Exchange target) { + handoverSynchronization(target, null); + } + + @Override + public void handoverSynchronization(Exchange target, Predicate<Synchronization> filter) { if (synchronizations == null || synchronizations.isEmpty()) { return; } @@ -204,7 +210,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { handover = veto.allowHandover(); } - if (handover) { + if (handover && (filter == null || filter.test(synchronization))) { log.trace("Handover synchronization {} to: {}", synchronization, target); target.addOnCompletion(synchronization); // remove it if its handed over http://git-wip-us.apache.org/repos/asf/camel/blob/8c18fecf/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java index b165f43..99d1640 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java +++ b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java @@ -16,6 +16,8 @@ */ package org.apache.camel.spi; +import java.util.function.Predicate; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -64,6 +66,18 @@ public interface UnitOfWork extends Service { void handoverSynchronization(Exchange target); /** + * Handover all the registered synchronizations to the target {@link org.apache.camel.Exchange}. + * <p/> + * This is used when a route turns into asynchronous and the {@link org.apache.camel.Exchange} that + * is continued and routed in the async thread should do the on completion callbacks instead of the + * original synchronous thread. + * + * @param target the target exchange + * @param filter optional filter to only handover if filter returns <tt>true</tt> + */ + void handoverSynchronization(Exchange target, Predicate<Synchronization> filter); + + /** * Invoked when this unit of work has been completed, whether it has failed or completed * * @param exchange the current exchange http://git-wip-us.apache.org/repos/asf/camel/blob/8c18fecf/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index 030b78d..2c92cc5 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; import org.apache.camel.CamelContext; import org.apache.camel.CamelExchangeException; @@ -44,6 +45,7 @@ import org.apache.camel.TypeConversionException; import org.apache.camel.TypeConverter; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.MessageSupport; +import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.UnitOfWork; /** @@ -229,6 +231,21 @@ public final class ExchangeHelper { * @param useSameMessageId whether to use same message id on the copy message. */ public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) { + return createCorrelatedCopy(exchange, handover, useSameMessageId, null); + } + + /** + * Creates a new instance and copies from the current message exchange so that it can be + * forwarded to another destination as a new instance. Unlike regular copy this operation + * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used + * for async messaging, where the original and copied exchange are independent. + * + * @param exchange original copy of the exchange + * @param handover whether the on completion callbacks should be handed over to the new copy. + * @param useSameMessageId whether to use same message id on the copy message. + * @param filter whether to handover the on completion + */ + public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId, Predicate<Synchronization> filter) { String id = exchange.getExchangeId(); // make sure to do a safe copy as the correlated copy can be routed independently of the source. @@ -246,7 +263,7 @@ public final class ExchangeHelper { // hand over on completion to the copy if we got any UnitOfWork uow = exchange.getUnitOfWork(); if (handover && uow != null) { - uow.handoverSynchronization(copy); + uow.handoverSynchronization(copy, filter); } // set a correlation id so we can track back the original exchange copy.setProperty(Exchange.CORRELATION_ID, id);