CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee5487e3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee5487e3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee5487e3 Branch: refs/heads/master Commit: ee5487e38c850d95e93d2e38478bd1573fbca990 Parents: 7593694 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon May 20 10:26:22 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 20 10:26:22 2013 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/camel/AsyncProcessor.java | 6 - .../camel/component/direct/DirectProducer.java | 6 +- .../camel/component/directvm/DirectVmProducer.java | 7 +- .../apache/camel/impl/InterceptSendToEndpoint.java | 18 ++- .../apache/camel/model/LoadBalancerDefinition.java | 3 +- .../apache/camel/processor/ChoiceProcessor.java | 2 +- .../java/org/apache/camel/processor/Enricher.java | 2 +- .../apache/camel/processor/MulticastProcessor.java | 2 +- .../org/apache/camel/processor/RecipientList.java | 2 +- .../org/apache/camel/processor/RoutingSlip.java | 2 +- .../org/apache/camel/processor/SendProcessor.java | 3 +- .../org/apache/camel/processor/TryProcessor.java | 2 +- .../loadbalancer/FailOverLoadBalancer.java | 3 +- .../processor/loadbalancer/QueueLoadBalancer.java | 28 ++--- .../apache/camel/util/AsyncProcessorHelper.java | 7 +- .../component/jms/EndpointMessageListener.java | 3 +- .../routebox/direct/RouteboxDirectProducer.java | 15 +-- .../routebox/seda/RouteboxSedaConsumer.java | 3 +- .../sjms/consumer/InOnlyMessageHandler.java | 10 +-- .../sjms/consumer/InOutMessageHandler.java | 9 +- .../camel/spring/spi/TransactionErrorHandler.java | 33 ++++-- .../TransactionalClientDataSourceAsyncTest.java | 86 +++++++++++++++ 22 files changed, 157 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java b/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java index bb5797b..1025320 100644 --- a/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java @@ -23,11 +23,6 @@ package org.apache.camel; * Any processor can be coerced to have an {@link AsyncProcessor} interface by using the * {@link org.apache.camel.impl.converter.AsyncProcessorTypeConverter#convert AsyncProcessorTypeConverter.covert} * method. - * <p/> - * <b>Important:<b/> Use the {@link org.apache.camel.util.AsyncProcessorHelper#process(AsyncProcessor, Exchange, AsyncCallback)} - * method to invoke the process method, which ensure Camel have a chance to interweave and invoke it in a reliable manner. - * For example when using transactions all the invocations has to occur in synchronous manner to ensure the transaction - * work is done in the same thread, which is required by Spring TransactionManager. * * @version */ @@ -44,7 +39,6 @@ public interface AsyncProcessor extends Processor { * If the exchange is completed synchronously, then the callback is also invoked synchronously. * The callback should therefore be careful of starting recursive loop. * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously - * @see org.apache.camel.util.AsyncProcessorHelper#process(AsyncProcessor, Exchange, AsyncCallback) */ boolean process(Exchange exchange, AsyncCallback callback); } http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java index 714641d..38d3919 100644 --- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java @@ -17,11 +17,8 @@ package org.apache.camel.component.direct; import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; -import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,8 +53,7 @@ public class DirectProducer extends DefaultAsyncProducer { callback.done(true); return true; } else { - AsyncProcessor processor = AsyncProcessorConverterHelper.convert(endpoint.getConsumer().getProcessor()); - return AsyncProcessorHelper.process(processor, exchange, callback); + return endpoint.getConsumer().getAsyncProcessor().process(exchange, callback); } } http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java index d032788..e175a61 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java @@ -17,12 +17,8 @@ package org.apache.camel.component.directvm; import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; -import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; -import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +57,7 @@ public class DirectVmProducer extends DefaultAsyncProducer { callback.done(true); return true; } else { - AsyncProcessor processor = AsyncProcessorConverterHelper.convert(consumer.getProcessor()); - return AsyncProcessorHelper.process(processor, exchange, callback); + return endpoint.getConsumer().getAsyncProcessor().process(exchange, callback); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java index ff010dc..a23026e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java @@ -29,8 +29,6 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,9 +157,19 @@ public class InterceptSendToEndpoint implements Endpoint { exchange.setOut(null); } - // route to original destination leveraging the asynchronous routing engine - AsyncProcessor async = AsyncProcessorConverterHelper.convert(producer); - return AsyncProcessorHelper.process(async, exchange, callback); + // route to original destination leveraging the asynchronous routing engine if possible + if (producer instanceof AsyncProcessor) { + AsyncProcessor async = (AsyncProcessor) producer; + return async.process(exchange, callback); + } else { + try { + producer.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + callback.done(true); + return true; + } } else { if (LOG.isDebugEnabled()) { LOG.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java index 4aaeda1..4cad474 100644 --- a/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java @@ -27,7 +27,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.processor.loadbalancer.LoadBalancer; import org.apache.camel.spi.RouteContext; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.IntrospectionSupport; import org.apache.camel.util.ObjectHelper; @@ -130,7 +129,7 @@ public class LoadBalancerDefinition extends IdentifiedType implements LoadBalanc public boolean process(Exchange exchange, final AsyncCallback callback) { ObjectHelper.notNull(loadBalancer, "loadBalancer"); - return AsyncProcessorHelper.process(loadBalancer, exchange, new AsyncCallback() { + return loadBalancer.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // only handle the async case if (doneSync) { http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java index 30dd3dd..5c44256 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java @@ -112,7 +112,7 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N // implement asynchronous routing logic in callback so we can have the callback being // triggered and then continue routing where we left - boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, new AsyncCallback() { + boolean sync = asyncProcessor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the pipeline if (doneSync) { http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/Enricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java index c55bc2c..8abea01 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java @@ -111,7 +111,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor { final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer); - boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new AsyncCallback() { + boolean sync = ap.process(resourceExchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the routing slip if (doneSync) { http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index ecc682f..92a4987 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -568,7 +568,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // let the prepared process it, remember to begin the exchange pair AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); pair.begin(); - sync = AsyncProcessorHelper.process(async, exchange, new AsyncCallback() { + sync = async.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we are done with the exchange pair pair.done(); http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java index 5c054b2..262fbc9 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java @@ -148,7 +148,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor { } // now let the multicast process the exchange - return AsyncProcessorHelper.process(target, exchange, callback); + return target.process(exchange, callback); } protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java index 546e061..037a4ad 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -284,7 +284,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri()); - boolean sync = AsyncProcessorHelper.process(asyncProducer, exchange, new AsyncCallback() { + boolean sync = asyncProducer.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the routing slip if (doneSync) { http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java index ad20191..eb8e97c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -31,7 +31,6 @@ import org.apache.camel.Traceable; import org.apache.camel.impl.InterceptSendToEndpoint; import org.apache.camel.impl.ProducerCache; import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.URISupport; @@ -119,7 +118,7 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra ExchangePattern pattern, final AsyncCallback callback) { final Exchange target = configureExchange(exchange, pattern); log.debug(">>>> {} {}", destination, exchange); - return AsyncProcessorHelper.process(asyncProducer, target, new AsyncCallback() { + return asyncProducer.process(target, new AsyncCallback() { public void done(boolean doneSync) { // restore previous MEP target.setPattern(existingPattern); http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java index 82b3a43..b83eed1 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -106,7 +106,7 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi // implement asynchronous routing logic in callback so we can have the callback being // triggered and then continue routing where we left - boolean sync = AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { + boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the pipeline if (doneSync) { http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java index d919c9e..42b76f4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java @@ -28,7 +28,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; @@ -244,7 +243,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab log.debug("Processing failover at attempt {} for {}", attempts, copy); AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); - return AsyncProcessorHelper.process(albp, copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors)); + return albp.process(copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors)); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java index 632dec1..916643d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java @@ -22,7 +22,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; /** @@ -40,25 +39,18 @@ public abstract class QueueLoadBalancer extends LoadBalancerSupport { if (processor == null) { throw new IllegalStateException("No processors could be chosen to process " + exchange); } else { - AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); - boolean sync = AsyncProcessorHelper.process(albp, exchange, new AsyncCallback() { - public void done(boolean doneSync) { - // only handle the async case - if (doneSync) { - return; - } - - callback.done(false); + if (processor instanceof AsyncProcessor) { + AsyncProcessor async = (AsyncProcessor) processor; + return async.process(exchange, callback); + } else { + try { + processor.process(exchange); + } catch (Exception e) { + exchange.setException(e); } - }); - - if (!sync) { - // will continue routing asynchronously - return false; + callback.done(true); + return true; } - - callback.done(true); - return true; } } http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java index 0875dae..833e53f 100644 --- a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java @@ -45,11 +45,11 @@ public final class AsyncProcessorHelper { * @param exchange the exchange * @param callback the callback * @return <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously + * @deprecated should no longer be needed, instead invoke the process method on the {@link AsyncProcessor} directly, + * instead of using this method. */ @Deprecated public static boolean process(final AsyncProcessor processor, final Exchange exchange, final AsyncCallback callback) { - // TODO: This method is no longer needed, and we can avoid using it - boolean sync; if (exchange.isTransacted()) { @@ -92,6 +92,9 @@ public final class AsyncProcessorHelper { * Calls the async version of the processor's process method and waits * for it to complete before returning. This can be used by {@link AsyncProcessor} * objects to implement their sync version of the process method. + * <p/> + * <b>Important:</b> This method is discouraged to be used, as its better to invoke the asynchronous + * {@link AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method, whenever possible. * * @param processor the processor * @param exchange the exchange http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java index 1496503..a20f73a 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java @@ -30,7 +30,6 @@ import org.apache.camel.Processor; import org.apache.camel.RollbackExchangeException; import org.apache.camel.RuntimeCamelException; import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +111,7 @@ public class EndpointMessageListener implements MessageListener { if (LOG.isTraceEnabled()) { LOG.trace("Processing exchange {} asynchronously", exchange.getExchangeId()); } - boolean sync = AsyncProcessorHelper.process(processor, exchange, callback); + boolean sync = processor.process(exchange, callback); if (!sync) { // will be done async so return now return; http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java index 2a1fb38..b227240 100644 --- a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java +++ b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java @@ -26,8 +26,6 @@ import org.apache.camel.Producer; import org.apache.camel.ProducerTemplate; import org.apache.camel.component.routebox.RouteboxServiceSupport; import org.apache.camel.component.routebox.strategy.RouteboxDispatcher; -import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,18 +68,9 @@ public class RouteboxDirectProducer extends RouteboxServiceSupport implements Pr RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer); exchange = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange); if (getRouteboxEndpoint().getConfig().isSendToConsumer()) { - AsyncProcessor processor = AsyncProcessorConverterHelper.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor()); - flag = AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { - public void done(boolean doneSync) { - // we only have to handle async completion of this policy - if (doneSync) { - return; - } - callback.done(false); - } - }); + AsyncProcessor processor = ((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getAsyncProcessor(); + flag = processor.process(exchange, callback); } - callback.done(true); } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); } http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java index 9db3a42..3c74870 100644 --- a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java +++ b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java @@ -31,7 +31,6 @@ import org.apache.camel.component.routebox.RouteboxServiceSupport; import org.apache.camel.component.routebox.strategy.RouteboxDispatcher; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +92,7 @@ public class RouteboxSedaConsumer extends RouteboxServiceSupport implements Rout LOG.debug("Dispatching to inner route: {}", exchange); RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer); result = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange); - AsyncProcessorHelper.process(processor, result, new AsyncCallback() { + processor.process(result, new AsyncCallback() { public void done(boolean doneSync) { // noop } http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java index b21de49..53ca48c 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java @@ -22,7 +22,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.spi.Synchronization; -import org.apache.camel.util.AsyncProcessorHelper; /** * An InOnly {@link AbstractMessageHandler} @@ -66,7 +65,7 @@ public class InOnlyMessageHandler extends AbstractMessageHandler { log.debug("Synchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri()); } try { - AsyncProcessorHelper.process(getProcessor(), exchange); + getProcessor().process(exchange); } catch (Exception e) { exchange.setException(e); } finally { @@ -75,13 +74,8 @@ public class InOnlyMessageHandler extends AbstractMessageHandler { } else { // process asynchronous using the async routing engine log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri()); - boolean sync = false; - sync = AsyncProcessorHelper.process(getProcessor(), exchange, callback); - if (!sync) { - // will be done async so return now - return; - } + getProcessor().process(exchange, callback); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java index bc38179..26b9a08 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java @@ -37,7 +37,6 @@ import org.apache.camel.component.sjms.SjmsExchangeMessageHelper; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.spi.Synchronization; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; /** @@ -118,7 +117,7 @@ public class InOutMessageHandler extends AbstractMessageHandler { // do so log.debug("Synchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri()); try { - AsyncProcessorHelper.process(getProcessor(), exchange); + getProcessor().process(exchange); } catch (Exception e) { exchange.setException(e); } finally { @@ -127,11 +126,7 @@ public class InOutMessageHandler extends AbstractMessageHandler { } else { // process asynchronous using the async routing engine log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri()); - boolean sync = AsyncProcessorHelper.process(getProcessor(), exchange, callback); - if (!sync) { - // will be done async so return now - return; - } + getProcessor().process(exchange, callback); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java index f072892..27721e8 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java @@ -16,6 +16,7 @@ */ package org.apache.camel.spring.spi; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.AsyncCallback; @@ -27,6 +28,7 @@ import org.apache.camel.Processor; import org.apache.camel.processor.RedeliveryErrorHandler; import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; +import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CamelLogger; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; @@ -213,18 +215,31 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler { * @param exchange the exchange */ protected void processByErrorHandler(final Exchange exchange) { - // must invoke the async method with empty callback to have it invoke the - // super.processErrorHandler - // we are transacted so we have to route synchronously so don't worry about returned - // value from the process method - // and the camel routing engine will detect this is an transacted Exchange and route - // it fully synchronously so we don't have to wait here if we hit an async endpoint - // all that is taken care of in the camel-core - super.process(exchange, new AsyncCallback() { + final CountDownLatch latch = new CountDownLatch(1); + boolean sync = super.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { - // noop + if (!doneSync) { + log.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId()); + latch.countDown(); + } + } + + @Override + public String toString() { + return "Done " + this; } }); + if (!sync) { + log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", + exchange.getExchangeId(), exchange); + try { + latch.await(); + } catch (InterruptedException e) { + exchange.setException(e); + } + log.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", + exchange.getExchangeId(), exchange); + } } /** http://git-wip-us.apache.org/repos/asf/camel/blob/ee5487e3/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceAsyncTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceAsyncTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceAsyncTest.java new file mode 100644 index 0000000..641c498 --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceAsyncTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.interceptor; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.async.MyAsyncComponent; +import org.apache.camel.spring.SpringRouteBuilder; +import org.apache.camel.spring.spi.SpringTransactionPolicy; + +/** + * Unit test to demonstrate the transactional client pattern. + */ +public class TransactionalClientDataSourceAsyncTest extends TransactionalClientDataSourceTest { + + public void testTransactionRollback() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:error"); + mock.expectedMessageCount(1); + + try { + template.sendBody("direct:fail", "Hello World"); + fail("Should have thrown exception"); + } catch (RuntimeCamelException e) { + // expected as we fail + assertIsInstanceOf(RuntimeCamelException.class, e.getCause()); + assertTrue(e.getCause().getCause() instanceof IllegalArgumentException); + assertEquals("We don't have Donkeys, only Camels", e.getCause().getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + + int count = jdbc.queryForInt("select count(*) from books"); + assertEquals("Number of books", 1, count); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new SpringRouteBuilder() { + public void configure() throws Exception { + + context.addComponent("async", new MyAsyncComponent()); + + // use required as transaction policy + SpringTransactionPolicy required = lookup("PROPAGATION_REQUIRED", SpringTransactionPolicy.class); + + // configure to use transaction error handler and pass on the required as it will fetch + // the transaction manager from it that it needs + errorHandler(transactionErrorHandler(required)); + + // on exception is also supported + onException(IllegalArgumentException.class).handled(false).to("mock:error"); + + from("direct:okay") + .policy(required) + .setBody(constant("Tiger in Action")).beanRef("bookService") + .log("Before thread ${threadName}") + .to("async:bye:camel") + .log("After thread ${threadName}") + .setBody(constant("Elephant in Action")).beanRef("bookService"); + + from("direct:fail") + .policy(required) + .setBody(constant("Tiger in Action")).beanRef("bookService") + .log("Before thread ${threadName}") + .to("async:bye:camel") + .log("After thread ${threadName}") + .setBody(constant("Donkey in Action")).beanRef("bookService"); + } + }; + } + +}