CAMEL-6377: Optimized routing engine to reduce stack frames in use during 
routing. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee5487e3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee5487e3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee5487e3

Branch: refs/heads/master
Commit: ee5487e38c850d95e93d2e38478bd1573fbca990
Parents: 7593694
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon May 20 10:26:22 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 20 10:26:22 2013 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/AsyncProcessor.java |    6 -
 .../camel/component/direct/DirectProducer.java     |    6 +-
 .../camel/component/directvm/DirectVmProducer.java |    7 +-
 .../apache/camel/impl/InterceptSendToEndpoint.java |   18 ++-
 .../apache/camel/model/LoadBalancerDefinition.java |    3 +-
 .../apache/camel/processor/ChoiceProcessor.java    |    2 +-
 .../java/org/apache/camel/processor/Enricher.java  |    2 +-
 .../apache/camel/processor/MulticastProcessor.java |    2 +-
 .../org/apache/camel/processor/RecipientList.java  |    2 +-
 .../org/apache/camel/processor/RoutingSlip.java    |    2 +-
 .../org/apache/camel/processor/SendProcessor.java  |    3 +-
 .../org/apache/camel/processor/TryProcessor.java   |    2 +-
 .../loadbalancer/FailOverLoadBalancer.java         |    3 +-
 .../processor/loadbalancer/QueueLoadBalancer.java  |   28 ++---
 .../apache/camel/util/AsyncProcessorHelper.java    |    7 +-
 .../component/jms/EndpointMessageListener.java     |    3 +-
 .../routebox/direct/RouteboxDirectProducer.java    |   15 +--
 .../routebox/seda/RouteboxSedaConsumer.java        |    3 +-
 .../sjms/consumer/InOnlyMessageHandler.java        |   10 +--
 .../sjms/consumer/InOutMessageHandler.java         |    9 +-
 .../camel/spring/spi/TransactionErrorHandler.java  |   33 ++++--
 .../TransactionalClientDataSourceAsyncTest.java    |   86 +++++++++++++++
 22 files changed, 157 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java 
b/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
index bb5797b..1025320 100644
--- a/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
@@ -23,11 +23,6 @@ package org.apache.camel;
  * Any processor can be coerced to have an {@link AsyncProcessor} interface by 
using the
  * {@link org.apache.camel.impl.converter.AsyncProcessorTypeConverter#convert 
AsyncProcessorTypeConverter.covert}
  * method.
- * <p/>
- * <b>Important:<b/> Use the {@link 
org.apache.camel.util.AsyncProcessorHelper#process(AsyncProcessor, Exchange, 
AsyncCallback)}
- * method to invoke the process method, which ensure Camel have a chance to 
interweave and invoke it in a reliable manner.
- * For example when using transactions all the invocations has to occur in 
synchronous manner to ensure the transaction
- * work is done in the same thread, which is required by Spring 
TransactionManager.
  *
  * @version 
  */
@@ -44,7 +39,6 @@ public interface AsyncProcessor extends Processor {
      *                 If the exchange is completed synchronously, then the 
callback is also invoked synchronously.
      *                 The callback should therefore be careful of starting 
recursive loop.
      * @return (doneSync) <tt>true</tt> to continue execute synchronously, 
<tt>false</tt> to continue being executed asynchronously
-     * @see org.apache.camel.util.AsyncProcessorHelper#process(AsyncProcessor, 
Exchange, AsyncCallback)
      */
     boolean process(Exchange exchange, AsyncCallback callback);
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
 
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index 714641d..38d3919 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -17,11 +17,8 @@
 package org.apache.camel.component.direct;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,8 +53,7 @@ public class DirectProducer extends DefaultAsyncProducer {
             callback.done(true);
             return true;
         } else {
-            AsyncProcessor processor = 
AsyncProcessorConverterHelper.convert(endpoint.getConsumer().getProcessor());
-            return AsyncProcessorHelper.process(processor, exchange, callback);
+            return 
endpoint.getConsumer().getAsyncProcessor().process(exchange, callback);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
 
b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
index d032788..e175a61 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
@@ -17,12 +17,8 @@
 package org.apache.camel.component.directvm;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,8 +57,7 @@ public class DirectVmProducer extends DefaultAsyncProducer {
             callback.done(true);
             return true;
         } else {
-            AsyncProcessor processor = 
AsyncProcessorConverterHelper.convert(consumer.getProcessor());
-            return AsyncProcessorHelper.process(processor, exchange, callback);
+            return 
endpoint.getConsumer().getAsyncProcessor().process(exchange, callback);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
index ff010dc..a23026e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
@@ -29,8 +29,6 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,9 +157,19 @@ public class InterceptSendToEndpoint implements Endpoint {
                         exchange.setOut(null);
                     }
 
-                    // route to original destination leveraging the 
asynchronous routing engine
-                    AsyncProcessor async = 
AsyncProcessorConverterHelper.convert(producer);
-                    return AsyncProcessorHelper.process(async, exchange, 
callback);
+                    // route to original destination leveraging the 
asynchronous routing engine if possible
+                    if (producer instanceof AsyncProcessor) {
+                        AsyncProcessor async = (AsyncProcessor) producer;
+                        return async.process(exchange, callback);
+                    } else {
+                        try {
+                            producer.process(exchange);
+                        } catch (Exception e) {
+                            exchange.setException(e);
+                        }
+                        callback.done(true);
+                        return true;
+                    }
                 } else {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Stop() means skip sending exchange to 
original intended destination: {} for exchange: {}", getEndpoint(), exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
index 4aaeda1..4cad474 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
@@ -27,7 +27,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.loadbalancer.LoadBalancer;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
 
@@ -130,7 +129,7 @@ public class LoadBalancerDefinition extends IdentifiedType 
implements LoadBalanc
 
     public boolean process(Exchange exchange, final AsyncCallback callback) {
         ObjectHelper.notNull(loadBalancer, "loadBalancer");
-        return AsyncProcessorHelper.process(loadBalancer, exchange, new 
AsyncCallback() {
+        return loadBalancer.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 // only handle the async case
                 if (doneSync) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
index 30dd3dd..5c44256 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
@@ -112,7 +112,7 @@ public class ChoiceProcessor extends ServiceSupport 
implements AsyncProcessor, N
 
         // implement asynchronous routing logic in callback so we can have the 
callback being
         // triggered and then continue routing where we left
-        boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, 
new AsyncCallback() {
+        boolean sync = asyncProcessor.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 // we only have to handle async completion of the pipeline
                 if (doneSync) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index c55bc2c..8abea01 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -111,7 +111,7 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor {
         final Exchange resourceExchange = createResourceExchange(exchange, 
ExchangePattern.InOut);
 
         AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
-        boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new 
AsyncCallback() {
+        boolean sync = ap.process(resourceExchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 // we only have to handle async completion of the routing slip
                 if (doneSync) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index ecc682f..92a4987 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -568,7 +568,7 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
             // let the prepared process it, remember to begin the exchange pair
             AsyncProcessor async = 
AsyncProcessorConverterHelper.convert(processor);
             pair.begin();
-            sync = AsyncProcessorHelper.process(async, exchange, new 
AsyncCallback() {
+            sync = async.process(exchange, new AsyncCallback() {
                 public void done(boolean doneSync) {
                     // we are done with the exchange pair
                     pair.done();

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 5c054b2..262fbc9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -148,7 +148,7 @@ public class RecipientList extends ServiceSupport 
implements AsyncProcessor {
         }
 
         // now let the multicast process the exchange
-        return AsyncProcessorHelper.process(target, exchange, callback);
+        return target.process(exchange, callback);
     }
 
     protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java 
b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 546e061..037a4ad 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -284,7 +284,7 @@ public class RoutingSlip extends ServiceSupport implements 
AsyncProcessor, Trace
                 exchange.setProperty(Exchange.TO_ENDPOINT, 
endpoint.getEndpointUri());
                 exchange.setProperty(Exchange.SLIP_ENDPOINT, 
endpoint.getEndpointUri());
 
-                boolean sync = AsyncProcessorHelper.process(asyncProducer, 
exchange, new AsyncCallback() {
+                boolean sync = asyncProducer.process(exchange, new 
AsyncCallback() {
                     public void done(boolean doneSync) {
                         // we only have to handle async completion of the 
routing slip
                         if (doneSync) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index ad20191..eb8e97c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -31,7 +31,6 @@ import org.apache.camel.Traceable;
 import org.apache.camel.impl.InterceptSendToEndpoint;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.URISupport;
@@ -119,7 +118,7 @@ public class SendProcessor extends ServiceSupport 
implements AsyncProcessor, Tra
                                              ExchangePattern pattern, final 
AsyncCallback callback) {
                 final Exchange target = configureExchange(exchange, pattern);
                 log.debug(">>>> {} {}", destination, exchange);
-                return AsyncProcessorHelper.process(asyncProducer, target, new 
AsyncCallback() {
+                return asyncProducer.process(target, new AsyncCallback() {
                     public void done(boolean doneSync) {
                         // restore previous MEP
                         target.setPattern(existingPattern);

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
index 82b3a43..b83eed1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -106,7 +106,7 @@ public class TryProcessor extends ServiceSupport implements 
AsyncProcessor, Navi
 
         // implement asynchronous routing logic in callback so we can have the 
callback being
         // triggered and then continue routing where we left
-        boolean sync = AsyncProcessorHelper.process(processor, exchange, new 
AsyncCallback() {
+        boolean sync = processor.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 // we only have to handle async completion of the pipeline
                 if (doneSync) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index d919c9e..42b76f4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -28,7 +28,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 
@@ -244,7 +243,7 @@ public class FailOverLoadBalancer extends 
LoadBalancerSupport implements Traceab
         log.debug("Processing failover at attempt {} for {}", attempts, copy);
 
         AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
-        return AsyncProcessorHelper.process(albp, copy, new 
FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors));
+        return albp.process(copy, new FailOverAsyncCallback(exchange, copy, 
attempts, index, callback, processors));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
index 632dec1..916643d 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
@@ -22,7 +22,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
 
 /**
@@ -40,25 +39,18 @@ public abstract class QueueLoadBalancer extends 
LoadBalancerSupport {
             if (processor == null) {
                 throw new IllegalStateException("No processors could be chosen 
to process " + exchange);
             } else {
-                AsyncProcessor albp = 
AsyncProcessorConverterHelper.convert(processor);
-                boolean sync = AsyncProcessorHelper.process(albp, exchange, 
new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        // only handle the async case
-                        if (doneSync) {
-                            return;
-                        }
-
-                        callback.done(false);
+                if (processor instanceof AsyncProcessor) {
+                    AsyncProcessor async = (AsyncProcessor) processor;
+                    return async.process(exchange, callback);
+                } else {
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        exchange.setException(e);
                     }
-                });
-
-                if (!sync) {
-                    // will continue routing asynchronously
-                    return false;
+                    callback.done(true);
+                    return true;
                 }
-
-                callback.done(true);
-                return true;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
index 0875dae..833e53f 100644
--- a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
@@ -45,11 +45,11 @@ public final class AsyncProcessorHelper {
      * @param exchange  the exchange
      * @param callback  the callback
      * @return <tt>true</tt> to continue execute synchronously, <tt>false</tt> 
to continue being executed asynchronously
+     * @deprecated should no longer be needed, instead invoke the process 
method on the {@link AsyncProcessor} directly,
+     * instead of using this method.
      */
     @Deprecated
     public static boolean process(final AsyncProcessor processor, final 
Exchange exchange, final AsyncCallback callback) {
-        // TODO: This method is no longer needed, and we can avoid using it
-
         boolean sync;
 
         if (exchange.isTransacted()) {
@@ -92,6 +92,9 @@ public final class AsyncProcessorHelper {
      * Calls the async version of the processor's process method and waits
      * for it to complete before returning. This can be used by {@link 
AsyncProcessor}
      * objects to implement their sync version of the process method.
+     * <p/>
+     * <b>Important:</b> This method is discouraged to be used, as its better 
to invoke the asynchronous
+     * {@link AsyncProcessor#process(org.apache.camel.Exchange, 
org.apache.camel.AsyncCallback)} method, whenever possible.
      *
      * @param processor the processor
      * @param exchange  the exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index 1496503..a20f73a 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -30,7 +30,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.RollbackExchangeException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,7 +111,7 @@ public class EndpointMessageListener implements 
MessageListener {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Processing exchange {} asynchronously", 
exchange.getExchangeId());
                 }
-                boolean sync = AsyncProcessorHelper.process(processor, 
exchange, callback);
+                boolean sync = processor.process(exchange, callback);
                 if (!sync) {
                     // will be done async so return now
                     return;

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
 
b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
index 2a1fb38..b227240 100644
--- 
a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
+++ 
b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
@@ -26,8 +26,6 @@ import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.routebox.RouteboxServiceSupport;
 import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,18 +68,9 @@ public class RouteboxDirectProducer extends 
RouteboxServiceSupport implements Pr
                 RouteboxDispatcher dispatcher = new 
RouteboxDispatcher(producer);
                 exchange = dispatcher.dispatchAsync(getRouteboxEndpoint(), 
exchange);      
                 if (getRouteboxEndpoint().getConfig().isSendToConsumer()) {
-                    AsyncProcessor processor = 
AsyncProcessorConverterHelper.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor());
-                    flag = AsyncProcessorHelper.process(processor, exchange, 
new AsyncCallback() {
-                        public void done(boolean doneSync) {
-                            // we only have to handle async completion of this 
policy
-                            if (doneSync) {
-                                return;
-                            }
-                            callback.done(false);
-                        }
-                    });
+                    AsyncProcessor processor = 
((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getAsyncProcessor();
+                    flag = processor.process(exchange, callback);
                 } 
-                callback.done(true);
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
 
b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
index 9db3a42..3c74870 100644
--- 
a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
+++ 
b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
@@ -31,7 +31,6 @@ import 
org.apache.camel.component.routebox.RouteboxServiceSupport;
 import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,7 +92,7 @@ public class RouteboxSedaConsumer extends 
RouteboxServiceSupport implements Rout
                     LOG.debug("Dispatching to inner route: {}", exchange);
                     RouteboxDispatcher dispatcher = new 
RouteboxDispatcher(producer);
                     result = dispatcher.dispatchAsync(getRouteboxEndpoint(), 
exchange); 
-                    AsyncProcessorHelper.process(processor, result, new 
AsyncCallback() {
+                    processor.process(result, new AsyncCallback() {
                         public void done(boolean doneSync) {
                             // noop
                         }

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
index b21de49..53ca48c 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
@@ -22,7 +22,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.Synchronization;
-import org.apache.camel.util.AsyncProcessorHelper;
 
 /**
  * An InOnly {@link AbstractMessageHandler}
@@ -66,7 +65,7 @@ public class InOnlyMessageHandler extends 
AbstractMessageHandler {
                     log.debug("Synchronous processing: Message[{}], 
Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri());
                 }
                 try {
-                    AsyncProcessorHelper.process(getProcessor(), exchange);
+                    getProcessor().process(exchange);
                 } catch (Exception e) {
                     exchange.setException(e);
                 } finally {
@@ -75,13 +74,8 @@ public class InOnlyMessageHandler extends 
AbstractMessageHandler {
             } else {
                 // process asynchronous using the async routing engine
                 log.debug("Aynchronous processing: Message[{}], 
Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri());
-                boolean sync = false;
 
-                sync = AsyncProcessorHelper.process(getProcessor(), exchange, 
callback);
-                if (!sync) {
-                    // will be done async so return now
-                    return;
-                }
+                getProcessor().process(exchange, callback);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
index bc38179..26b9a08 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
@@ -37,7 +37,6 @@ import 
org.apache.camel.component.sjms.SjmsExchangeMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.spi.Synchronization;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -118,7 +117,7 @@ public class InOutMessageHandler extends 
AbstractMessageHandler {
                     // do so
                     log.debug("Synchronous processing: Message[{}], 
Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri());
                     try {
-                        AsyncProcessorHelper.process(getProcessor(), exchange);
+                        getProcessor().process(exchange);
                     } catch (Exception e) {
                         exchange.setException(e);
                     } finally {
@@ -127,11 +126,7 @@ public class InOutMessageHandler extends 
AbstractMessageHandler {
                 } else {
                     // process asynchronous using the async routing engine
                     log.debug("Aynchronous processing: Message[{}], 
Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri());
-                    boolean sync = 
AsyncProcessorHelper.process(getProcessor(), exchange, callback);
-                    if (!sync) {
-                        // will be done async so return now
-                        return;
-                    }
+                    getProcessor().process(exchange, callback);
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
 
b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
index f072892..27721e8 100644
--- 
a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
+++ 
b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.spring.spi;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.AsyncCallback;
@@ -27,6 +28,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.processor.RedeliveryErrorHandler;
 import org.apache.camel.processor.RedeliveryPolicy;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -213,18 +215,31 @@ public class TransactionErrorHandler extends 
RedeliveryErrorHandler {
      * @param exchange the exchange
      */
     protected void processByErrorHandler(final Exchange exchange) {
-        // must invoke the async method with empty callback to have it invoke 
the
-        // super.processErrorHandler
-        // we are transacted so we have to route synchronously so don't worry 
about returned
-        // value from the process method
-        // and the camel routing engine will detect this is an transacted 
Exchange and route
-        // it fully synchronously so we don't have to wait here if we hit an 
async endpoint
-        // all that is taken care of in the camel-core
-        super.process(exchange, new AsyncCallback() {
+        final CountDownLatch latch = new CountDownLatch(1);
+        boolean sync = super.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
-                // noop
+                if (!doneSync) {
+                    log.trace("Asynchronous callback received for exchangeId: 
{}", exchange.getExchangeId());
+                    latch.countDown();
+                }
+            }
+
+            @Override
+            public String toString() {
+                return "Done " + this;
             }
         });
+        if (!sync) {
+            log.trace("Waiting for asynchronous callback before continuing for 
exchangeId: {} -> {}",
+                    exchange.getExchangeId(), exchange);
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                exchange.setException(e);
+            }
+            log.trace("Asynchronous callback received, will continue routing 
exchangeId: {} -> {}",
+                    exchange.getExchangeId(), exchange);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceAsyncTest.java
 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceAsyncTest.java
new file mode 100644
index 0000000..641c498
--- /dev/null
+++ 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceAsyncTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.interceptor;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.async.MyAsyncComponent;
+import org.apache.camel.spring.SpringRouteBuilder;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+
+/**
+ * Unit test to demonstrate the transactional client pattern.
+ */
+public class TransactionalClientDataSourceAsyncTest extends 
TransactionalClientDataSourceTest {
+
+    public void testTransactionRollback() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:error");
+        mock.expectedMessageCount(1);
+
+        try {
+            template.sendBody("direct:fail", "Hello World");
+            fail("Should have thrown exception");
+        } catch (RuntimeCamelException e) {
+            // expected as we fail
+            assertIsInstanceOf(RuntimeCamelException.class, e.getCause());
+            assertTrue(e.getCause().getCause() instanceof 
IllegalArgumentException);
+            assertEquals("We don't have Donkeys, only Camels", 
e.getCause().getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        int count = jdbc.queryForInt("select count(*) from books");
+        assertEquals("Number of books", 1, count);
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new SpringRouteBuilder() {
+            public void configure() throws Exception {
+
+                context.addComponent("async", new MyAsyncComponent());
+
+                // use required as transaction policy
+                SpringTransactionPolicy required = 
lookup("PROPAGATION_REQUIRED", SpringTransactionPolicy.class);
+
+                // configure to use transaction error handler and pass on the 
required as it will fetch
+                // the transaction manager from it that it needs
+                errorHandler(transactionErrorHandler(required));
+
+                // on exception is also supported
+                
onException(IllegalArgumentException.class).handled(false).to("mock:error");
+
+                from("direct:okay")
+                    .policy(required)
+                    .setBody(constant("Tiger in 
Action")).beanRef("bookService")
+                    .log("Before thread ${threadName}")
+                    .to("async:bye:camel")
+                    .log("After thread ${threadName}")
+                    .setBody(constant("Elephant in 
Action")).beanRef("bookService");
+
+                from("direct:fail")
+                    .policy(required)
+                    .setBody(constant("Tiger in 
Action")).beanRef("bookService")
+                    .log("Before thread ${threadName}")
+                    .to("async:bye:camel")
+                    .log("After thread ${threadName}")
+                    .setBody(constant("Donkey in 
Action")).beanRef("bookService");
+            }
+        };
+    }
+
+}

Reply via email to