Repository: camel
Updated Branches:
  refs/heads/master 0f5b58161 -> 8c18fecf7


CAMEL-10195: rest-dsl - automatic binding failure with 
waitForTaskToComplete=Never


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c18fecf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c18fecf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c18fecf

Branch: refs/heads/master
Commit: 8c18fecf7156dca6d2dbd87908914f465836d9eb
Parents: 0f5b581
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Aug 3 17:18:30 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Aug 3 17:18:30 2016 +0200

----------------------------------------------------------------------
 .../camel/component/seda/SedaProducer.java      | 20 +++++---------------
 .../apache/camel/impl/DefaultUnitOfWork.java    |  8 +++++++-
 .../java/org/apache/camel/spi/UnitOfWork.java   | 14 ++++++++++++++
 .../org/apache/camel/util/ExchangeHelper.java   | 19 ++++++++++++++++++-
 4 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8c18fecf/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index 3e34e8a..03ce9f3 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -27,6 +28,7 @@ import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.SynchronizationVetoable;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.ExchangeHelper;
 
@@ -180,23 +182,11 @@ public class SedaProducer extends DefaultAsyncProducer {
 
     protected Exchange prepareCopy(Exchange exchange, boolean handover) {
         // use a new copy of the exchange to route async (and use same message 
id)
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, 
true);
+
         // if handover we need to do special handover to avoid handing over
         // RestBindingMarshalOnCompletion as it should not be handed over with 
SEDA
-        if (handover) {
-            List<Synchronization> completions = exchange.handoverCompletions();
-            if (completions != null) {
-                for (Synchronization sync : completions) {
-                    if 
(sync.getClass().getName().contains("RestBindingMarshalOnCompletion")) {
-                        // keep this one
-                        exchange.addOnCompletion(sync);
-                    } else {
-                        // handover
-                        copy.addOnCompletion(sync);
-                    }
-                }
-            }
-        }
+        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
handover, true,
+                synchronization -> 
!synchronization.getClass().getName().contains("RestBindingMarshalOnCompletion"));
         // set a new from endpoint to be the seda queue
         copy.setFromEndpoint(endpoint);
         return copy;

http://git-wip-us.apache.org/repos/asf/camel/blob/8c18fecf/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
index 3fbd252..ae46435 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.Stack;
+import java.util.function.Predicate;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -190,6 +191,11 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
     }
 
     public void handoverSynchronization(Exchange target) {
+        handoverSynchronization(target, null);
+    }
+
+    @Override
+    public void handoverSynchronization(Exchange target, 
Predicate<Synchronization> filter) {
         if (synchronizations == null || synchronizations.isEmpty()) {
             return;
         }
@@ -204,7 +210,7 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
                 handover = veto.allowHandover();
             }
 
-            if (handover) {
+            if (handover && (filter == null || filter.test(synchronization))) {
                 log.trace("Handover synchronization {} to: {}", 
synchronization, target);
                 target.addOnCompletion(synchronization);
                 // remove it if its handed over

http://git-wip-us.apache.org/repos/asf/camel/blob/8c18fecf/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java 
b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
index b165f43..99d1640 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spi;
 
+import java.util.function.Predicate;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -64,6 +66,18 @@ public interface UnitOfWork extends Service {
     void handoverSynchronization(Exchange target);
 
     /**
+     * Handover all the registered synchronizations to the target {@link 
org.apache.camel.Exchange}.
+     * <p/>
+     * This is used when a route turns into asynchronous and the {@link 
org.apache.camel.Exchange} that
+     * is continued and routed in the async thread should do the on completion 
callbacks instead of the
+     * original synchronous thread.
+     *
+     * @param target the target exchange
+     * @param filter optional filter to only handover if filter returns 
<tt>true</tt>
+     */
+    void handoverSynchronization(Exchange target, Predicate<Synchronization> 
filter);
+
+    /**
      * Invoked when this unit of work has been completed, whether it has 
failed or completed
      *
      * @param exchange the current exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/8c18fecf/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 030b78d..2c92cc5 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
@@ -44,6 +45,7 @@ import org.apache.camel.TypeConversionException;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.MessageSupport;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
 
 /**
@@ -229,6 +231,21 @@ public final class ExchangeHelper {
      * @param useSameMessageId whether to use same message id on the copy 
message.
      */
     public static Exchange createCorrelatedCopy(Exchange exchange, boolean 
handover, boolean useSameMessageId) {
+        return createCorrelatedCopy(exchange, handover, useSameMessageId, 
null);
+    }
+
+    /**
+     * Creates a new instance and copies from the current message exchange so 
that it can be
+     * forwarded to another destination as a new instance. Unlike regular copy 
this operation
+     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its 
should be used
+     * for async messaging, where the original and copied exchange are 
independent.
+     *
+     * @param exchange original copy of the exchange
+     * @param handover whether the on completion callbacks should be handed 
over to the new copy.
+     * @param useSameMessageId whether to use same message id on the copy 
message.
+     * @param filter whether to handover the on completion
+     */
+    public static Exchange createCorrelatedCopy(Exchange exchange, boolean 
handover, boolean useSameMessageId, Predicate<Synchronization> filter) {
         String id = exchange.getExchangeId();
 
         // make sure to do a safe copy as the correlated copy can be routed 
independently of the source.
@@ -246,7 +263,7 @@ public final class ExchangeHelper {
         // hand over on completion to the copy if we got any
         UnitOfWork uow = exchange.getUnitOfWork();
         if (handover && uow != null) {
-            uow.handoverSynchronization(copy);
+            uow.handoverSynchronization(copy, filter);
         }
         // set a correlation id so we can track back the original exchange
         copy.setProperty(Exchange.CORRELATION_ID, id);

Reply via email to