Updated Branches:
  refs/heads/master e2b22608c -> 3671004c6

CAMEL-6377: Optimized routing engine to reduce stack frames in use during 
routing. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fb15fdee
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fb15fdee
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fb15fdee

Branch: refs/heads/master
Commit: fb15fdee4b48d1cc9f8e8db6abf4048da4558308
Parents: e2b2260
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat May 18 16:48:25 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat May 18 16:48:25 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/seda/SedaConsumer.java  |    5 +-
 .../org/apache/camel/impl/DefaultRouteContext.java |   27 +-
 .../camel/management/InstrumentationProcessor.java |    1 +
 .../java/org/apache/camel/model/ModelChannel.java  |    1 +
 .../apache/camel/model/TransactedDefinition.java   |    1 +
 .../camel/processor/CamelInternalProcessor.java    |  412 +++++++++++++++
 .../processor/CamelInternalProcessorTask.java      |   49 ++
 .../camel/processor/DelegateAsyncProcessor.java    |    2 +-
 .../java/org/apache/camel/processor/Pipeline.java  |    3 +-
 .../camel/processor/RedeliveryErrorHandler.java    |   23 +-
 .../camel/processor/RouteContextProcessor.java     |    1 +
 .../RouteInflightRepositoryProcessor.java          |    1 +
 .../camel/processor/RoutePolicyProcessor.java      |    1 +
 .../apache/camel/processor/ThreadsProcessor.java   |   13 +
 .../camel/processor/UnitOfWorkProcessor.java       |    1 +
 .../camel/processor/interceptor/BacklogTracer.java |   10 +-
 .../interceptor/BacklogTracerInterceptor.java      |    1 +
 .../processor/interceptor/DefaultChannel.java      |   43 +-
 .../apache/camel/util/AsyncProcessorHelper.java    |    3 +
 .../ReduceStacksNeededDuringRoutingTest.java       |   79 +++
 ...taSourceTransactedWithLocalOnExceptionTest.java |    4 +-
 21 files changed, 634 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 1b7f42b..ac26109 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -38,7 +38,6 @@ import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.UnitOfWorkHelper;
@@ -266,7 +265,7 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
             ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this);
 
             // and use the asynchronous routing engine to support it
-            AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
+            mp.process(exchange, new AsyncCallback() {
                 public void done(boolean doneSync) {
                     // done the uow on the completions
                     UnitOfWorkHelper.doneSynchronizations(exchange, 
completions, LOG);
@@ -274,7 +273,7 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
             });
         } else {
             // use the regular processor and use the asynchronous routing 
engine to support it
-            AsyncProcessorHelper.process(processor, exchange, new 
AsyncCallback() {
+            processor.process(exchange, new AsyncCallback() {
                 public void done(boolean doneSync) {
                     // noop
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
index 460a2d4..0531e6c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
@@ -34,9 +34,8 @@ import org.apache.camel.management.InstrumentationProcessor;
 import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.Pipeline;
-import org.apache.camel.processor.RouteInflightRepositoryProcessor;
-import org.apache.camel.processor.RoutePolicyProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.RouteContext;
@@ -154,8 +153,9 @@ public class DefaultRouteContext implements RouteContext {
             // and wrap it in a unit of work so the UoW is on the top, so the 
entire route will be in the same UoW
             UnitOfWorkProcessor unitOfWorkProcessor = new 
UnitOfWorkProcessor(this, target);
 
+            CamelInternalProcessor internal = new 
CamelInternalProcessor(unitOfWorkProcessor);
+
             // and then optionally add route policy processor if a custom 
policy is set
-            RoutePolicyProcessor routePolicyProcessor = null;
             List<RoutePolicy> routePolicyList = getRoutePolicyList();
             if (routePolicyList != null && !routePolicyList.isEmpty()) {
                 for (RoutePolicy policy : routePolicyList) {
@@ -169,25 +169,23 @@ public class DefaultRouteContext implements RouteContext {
                         }
                     }
                 }
-                routePolicyProcessor = new 
RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
-                target = routePolicyProcessor;
-            } else {
-                target = unitOfWorkProcessor;
+
+                internal.addTask(new 
CamelInternalProcessor.RoutePolicyTask(routePolicyList));
             }
 
             // wrap in route inflight processor to track number of inflight 
exchanges for the route
-            RouteInflightRepositoryProcessor inflight = new 
RouteInflightRepositoryProcessor(camelContext.getInflightRepository(), target);
+            String routeId = 
route.idOrCreate(getCamelContext().getNodeIdFactory());
+            internal.addTask(new 
CamelInternalProcessor.RouteInflightRepositoryTask(camelContext.getInflightRepository(),
 routeId));
 
+            // TODO: This should be a task as well
             // and wrap it by a instrumentation processor that is to be used 
for performance stats
             // for this particular route
             InstrumentationProcessor instrument = new 
InstrumentationProcessor();
             instrument.setType("route");
-            instrument.setProcessor(inflight);
+            instrument.setProcessor(internal);
 
             // and create the route that wraps the UoW
             Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), 
instrument);
-            // create the route id
-            String routeId = 
route.idOrCreate(getCamelContext().getNodeIdFactory());
             edcr.getProperties().put(Route.ID_PROPERTY, routeId);
             edcr.getProperties().put(Route.PARENT_PROPERTY, 
Integer.toHexString(route.hashCode()));
             if (route.getGroup() != null) {
@@ -195,11 +193,10 @@ public class DefaultRouteContext implements RouteContext {
             }
 
             // after the route is created then set the route on the policy 
processor so we get hold of it
-            if (routePolicyProcessor != null) {
-                routePolicyProcessor.setRoute(edcr);
+            CamelInternalProcessor.RoutePolicyTask task = 
internal.getTask(CamelInternalProcessor.RoutePolicyTask.class);
+            if (task != null) {
+                task.setRoute(edcr);
             }
-            // after the route is created then set the route on the inflight 
processor so we get hold of it
-            inflight.setRoute(edcr);
 
             // invoke init on route policy
             if (routePolicyList != null && !routePolicyList.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
index a406cf7..18e6dd6 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
+@Deprecated
 public class InstrumentationProcessor extends DelegateAsyncProcessor {
 
     private static final transient Logger LOG = 
LoggerFactory.getLogger(InstrumentationProcessor.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java 
b/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java
index 82f4d52..519558d 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java
@@ -20,6 +20,7 @@ import org.apache.camel.Channel;
 import org.apache.camel.spi.RouteContext;
 
 public interface ModelChannel extends Channel {
+
     /**
      * Initializes the channel.
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java
index 81632f7..5a52b56 100644
--- a/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java
@@ -147,6 +147,7 @@ public class TransactedDefinition extends 
OutputDefinition<TransactedDefinition>
         // wrap
         Processor target = policy.wrap(routeContext, childProcessor);
 
+        // TODO: should not be needed as its already a service
         // wrap the target so it becomes a service and we can manage its 
lifecycle
         WrapProcessor wrap = new WrapProcessor(target, childProcessor);
         return wrap;

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
new file mode 100644
index 0000000..52c6807
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.StatefulService;
+import org.apache.camel.api.management.PerformanceCounter;
+import org.apache.camel.management.DelegatePerformanceCounter;
+import org.apache.camel.management.mbean.ManagedPerformanceCounter;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.interceptor.BacklogTracer;
+import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage;
+import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal {@link Processor} that Camel routing engine used during routing 
for cross cutting functionality such as:
+ * <ul>
+ *     <li>Keeping track which route currently is being routed</li>
+ *     <li>Gather JMX performance statics</li>
+ *     <li>Tracing the routing using</li>
+ *     <li>Execute {@link RoutePolicy}</li>
+ * </ul>
+ * ... and much more.
+ * <p/>
+ * This implementation executes this cross cutting functionality as a {@link 
CamelInternalProcessorTask} task
+ * by executing the {@link 
CamelInternalProcessorTask#before(org.apache.camel.Exchange)} and
+ * {@link CamelInternalProcessorTask#after(org.apache.camel.Exchange, Object)} 
callbacks in correct order during routing.
+ * This reduces number of stack frames needed during routing, and reduce the 
number of lines in stacktraces, as well
+ * makes debugging the routing engine easier for end users.
+ */
+public final class CamelInternalProcessor extends DelegateAsyncProcessor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelInternalProcessor.class);
+    private final List<CamelInternalProcessorTask> tasks = new 
ArrayList<CamelInternalProcessorTask>();
+
+    public CamelInternalProcessor() {
+    }
+
+    public CamelInternalProcessor(Processor processor) {
+        super(processor);
+    }
+
+    public CamelInternalProcessor(AsyncProcessor processor) {
+        super(processor);
+    }
+
+    public void addTask(CamelInternalProcessorTask task) {
+        tasks.add(task);
+    }
+
+    public void addFirstTask(CamelInternalProcessorTask task) {
+        tasks.add(0, task);
+    }
+
+    public <T> T getTask(Class<T> type) {
+        for (CamelInternalProcessorTask task : tasks) {
+            if (type.isInstance(task)) {
+                return type.cast(task);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (processor == null) {
+            // no processor then we are done
+            callback.done(true);
+            return true;
+        }
+
+        final List<Object> states = new ArrayList<Object>(tasks.size());
+        for (CamelInternalProcessorTask task : tasks) {
+            try {
+                Object state = task.before(exchange);
+                states.add(state);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        }
+
+        callback = new InternalCallback(states, exchange, callback);
+
+        if (exchange.isTransacted()) {
+            // must be synchronized for transacted exchanges
+            LOG.trace("Transacted Exchange must be routed synchronously for 
exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+            try {
+                processor.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+            callback.done(true);
+            return true;
+        } else {
+            final UnitOfWork uow = exchange.getUnitOfWork();
+
+            // allow unit of work to wrap callback in case it need to do some 
special work
+            // for example the MDCUnitOfWork
+            AsyncCallback async = callback;
+            if (uow != null) {
+                async = uow.beforeProcess(processor, exchange, callback);
+            }
+
+            boolean sync = processor.process(exchange, async);
+
+            // execute any after processor work (in current thread, not in the 
callback)
+            if (uow != null) {
+                uow.afterProcess(processor, exchange, callback, sync);
+            }
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Exchange processed and is continued routed {} for 
exchangeId: {} -> {}",
+                        new Object[]{sync ? "synchronously" : 
"asynchronously", exchange.getExchangeId(), exchange});
+            }
+            return sync;
+        }
+    }
+
+    private final class InternalCallback implements AsyncCallback {
+
+        private final List<Object> states;
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+
+        private InternalCallback(List<Object> states, Exchange exchange, 
AsyncCallback callback) {
+            this.states = states;
+            this.exchange = exchange;
+            this.callback = callback;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            // we should call after in reverse order
+            for (int i = tasks.size() - 1; i >= 0; i--) {
+                CamelInternalProcessorTask task = tasks.get(i);
+                Object state = states.get(i);
+                try {
+                    task.after(exchange, state);
+                } catch (Throwable e) {
+                    exchange.setException(e);
+                    break;
+                }
+            }
+            callback.done(doneSync);
+        }
+    }
+
+    public static class InstrumentationTask implements 
CamelInternalProcessorTask<StopWatch> {
+
+        private PerformanceCounter counter;
+        private String type;
+
+        public void setCounter(Object counter) {
+            ManagedPerformanceCounter mpc = null;
+            if (counter instanceof ManagedPerformanceCounter) {
+                mpc = (ManagedPerformanceCounter) counter;
+            }
+
+            if (this.counter instanceof DelegatePerformanceCounter) {
+                ((DelegatePerformanceCounter) this.counter).setCounter(mpc);
+            } else if (mpc != null) {
+                this.counter = mpc;
+            } else if (counter instanceof PerformanceCounter) {
+                this.counter = (PerformanceCounter) counter;
+            }
+        }
+
+        protected void recordTime(Exchange exchange, long duration) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("{}Recording duration: {} millis for exchange: {}", 
new Object[]{type != null ? type + ": " : "", duration, exchange});
+            }
+
+            if (!exchange.isFailed() && exchange.getException() == null) {
+                counter.completedExchange(exchange, duration);
+            } else {
+                counter.failedExchange(exchange);
+            }
+        }
+
+        public String getType() {
+            return type;
+        }
+
+        public void setType(String type) {
+            this.type = type;
+        }
+
+        @Override
+        public StopWatch before(Exchange exchange) throws Exception {
+            // only record time if stats is enabled
+            return (counter != null && counter.isStatisticsEnabled()) ? new 
StopWatch() : null;
+        }
+
+        @Override
+        public void after(Exchange exchange, StopWatch watch) throws Exception 
{
+            // record end time
+            if (watch != null) {
+                recordTime(exchange, watch.stop());
+            }
+        }
+    }
+
+    public static class RouteContextTask implements 
CamelInternalProcessorTask<UnitOfWork> {
+
+        private final RouteContext routeContext;
+
+        public RouteContextTask(RouteContext routeContext) {
+            this.routeContext = routeContext;
+        }
+
+        @Override
+        public UnitOfWork before(Exchange exchange) throws Exception {
+            // push the current route context
+            final UnitOfWork unitOfWork = exchange.getUnitOfWork();
+            if (unitOfWork != null) {
+                unitOfWork.pushRouteContext(routeContext);
+            }
+            return unitOfWork;
+        }
+
+        @Override
+        public void after(Exchange exchange, UnitOfWork unitOfWork) throws 
Exception {
+            if (unitOfWork != null) {
+                unitOfWork.popRouteContext();
+            }
+        }
+    }
+
+    public static class RouteInflightRepositoryTask implements 
CamelInternalProcessorTask {
+
+        private final InflightRepository inflightRepository;
+        private final String id;
+
+        public RouteInflightRepositoryTask(InflightRepository 
inflightRepository, String id) {
+            this.inflightRepository = inflightRepository;
+            this.id = id;
+        }
+
+        @Override
+        public Object before(Exchange exchange) throws Exception {
+            inflightRepository.add(exchange, id);
+            return null;
+        }
+
+        @Override
+        public void after(Exchange exchange, Object state) throws Exception {
+            inflightRepository.remove(exchange, id);
+        }
+    }
+
+    public static class RoutePolicyTask implements CamelInternalProcessorTask {
+
+        private final List<RoutePolicy> routePolicies;
+        private Route route;
+
+        public RoutePolicyTask(List<RoutePolicy> routePolicies) {
+            this.routePolicies = routePolicies;
+        }
+
+        public void setRoute(Route route) {
+            this.route = route;
+        }
+
+        /**
+         * Strategy to determine if this policy is allowed to run
+         *
+         * @param policy the policy
+         * @return <tt>true</tt> to run
+         */
+        protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) {
+            if (policy instanceof StatefulService) {
+                StatefulService ss = (StatefulService) policy;
+                return ss.isRunAllowed();
+            }
+            return true;
+        }
+
+        @Override
+        public Object before(Exchange exchange) throws Exception {
+            // invoke begin
+            for (RoutePolicy policy : routePolicies) {
+                try {
+                    if (isRoutePolicyRunAllowed(policy)) {
+                        policy.onExchangeBegin(route, exchange);
+                    }
+                } catch (Exception e) {
+                    LOG.warn("Error occurred during onExchangeBegin on 
RoutePolicy: " + policy
+                            + ". This exception will be ignored", e);
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public void after(Exchange exchange, Object data) throws Exception {
+            // do not invoke it if Camel is stopping as we don't want
+            // the policy to start a consumer during Camel is stopping
+            if (isCamelStopping(exchange.getContext())) {
+                return;
+            }
+
+            for (RoutePolicy policy : routePolicies) {
+                try {
+                    if (isRoutePolicyRunAllowed(policy)) {
+                        policy.onExchangeDone(route, exchange);
+                    }
+                } catch (Exception e) {
+                    LOG.warn("Error occurred during onExchangeDone on 
RoutePolicy: " + policy
+                            + ". This exception will be ignored", e);
+                }
+            }
+        }
+
+        private static boolean isCamelStopping(CamelContext context) {
+            if (context instanceof StatefulService) {
+                StatefulService ss = (StatefulService) context;
+                return ss.isStopping() || ss.isStopped();
+            }
+            return false;
+        }
+    }
+
+    public static final class BacklogTracerTask implements 
CamelInternalProcessorTask {
+
+        private final Queue<DefaultBacklogTracerEventMessage> queue;
+        private final BacklogTracer backlogTracer;
+        private final ProcessorDefinition<?> processorDefinition;
+        private final ProcessorDefinition<?> routeDefinition;
+        private final boolean first;
+
+        public BacklogTracerTask(Queue<DefaultBacklogTracerEventMessage> 
queue, BacklogTracer backlogTracer,
+                                 ProcessorDefinition<?> processorDefinition, 
ProcessorDefinition<?> routeDefinition, boolean first) {
+            this.queue = queue;
+            this.backlogTracer = backlogTracer;
+            this.processorDefinition = processorDefinition;
+            this.routeDefinition = routeDefinition;
+            this.first = first;
+        }
+
+        @Override
+        public Object before(Exchange exchange) throws Exception {
+            if (backlogTracer.shouldTrace(processorDefinition, exchange)) {
+                // ensure there is space on the queue
+                int drain = queue.size() - backlogTracer.getBacklogSize();
+                // and we need room for ourselves and possible also a first 
pseudo message as well
+                drain += first ? 2 : 1;
+                if (drain > 0) {
+                    for (int i = 0; i < drain; i++) {
+                        queue.poll();
+                    }
+                }
+
+                Date timestamp = new Date();
+                String toNode = processorDefinition.getId();
+                String exchangeId = exchange.getExchangeId();
+                String messageAsXml = 
MessageHelper.dumpAsXml(exchange.getIn(), true, 4,
+                        backlogTracer.isBodyIncludeStreams(), 
backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars());
+
+                // if first we should add a pseudo trace message as well, so 
we have a starting message (eg from the route)
+                String routeId = routeDefinition.getId();
+                if (first) {
+                    Date created = 
exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class);
+                    DefaultBacklogTracerEventMessage pseudo = new 
DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), 
created, routeId, null, exchangeId, messageAsXml);
+                    queue.add(pseudo);
+                }
+                DefaultBacklogTracerEventMessage event = new 
DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), 
timestamp, routeId, toNode, exchangeId, messageAsXml);
+                queue.add(event);
+            }
+
+            return null;
+        }
+
+        @Override
+        public void after(Exchange exchange, Object data) throws Exception {
+            // noop
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessorTask.java
 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessorTask.java
new file mode 100644
index 0000000..1df3db9
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessorTask.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+
+/**
+ * A task to execute cross cutting functionality in the Camel routing engine.
+ * <p/>
+ * The Camel routing engine will execute the {@link 
#before(org.apache.camel.Exchange)} and
+ * {@link #after(org.apache.camel.Exchange, Object)} methods during routing in 
correct order.
+ *
+ * @param <T>
+ * @see CamelInternalProcessor
+ */
+public interface CamelInternalProcessorTask<T> {
+
+    /**
+     * Callback executed before processing a step in the route.
+     *
+     * @param exchange  the current exchange
+     * @return any state to keep and provide as data to the {@link 
#after(org.apache.camel.Exchange, Object)} method, or use <tt>null</tt> for no 
state.
+     * @throws Exception is thrown if error during the call.
+     */
+    T before(Exchange exchange) throws Exception;
+
+    /**
+     * Callback executed after processing a step in the route.
+     *
+     * @param exchange  the current exchange
+     * @param data      the state, if any, returned in the {@link 
#before(org.apache.camel.Exchange)} method.
+     * @throws Exception is thrown if error during the call.
+     */
+    void after(Exchange exchange, T data) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
index 716505b..a49dff4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
@@ -96,7 +96,7 @@ public class DelegateAsyncProcessor extends ServiceSupport 
implements DelegatePr
             callback.done(true);
             return true;
         }
-        return AsyncProcessorHelper.process(processor, exchange, callback);
+        return processor.process(exchange, callback);
     }
 
     public boolean hasNext() {

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java 
b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
index 3ace4c5..2850cc6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -114,7 +114,8 @@ public class Pipeline extends MulticastProcessor implements 
AsyncProcessor, Trac
 
         // implement asynchronous routing logic in callback so we can have the 
callback being
         // triggered and then continue routing where we left
-        boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, 
new AsyncCallback() {
+        //boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange,
+        boolean sync = asyncProcessor.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 // we only have to handle async completion of the pipeline
                 if (doneSync) {

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index f34b467..02b56a8 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -126,7 +126,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                 // this redelivery task was scheduled from synchronous, which 
we forced to be asynchronous from
                 // this error handler, which means we have to invoke the 
callback with false, to have the callback
                 // be notified when we are done
-                sync = AsyncProcessorHelper.process(outputAsync, exchange, new 
AsyncCallback() {
+                sync = outputAsync.process(exchange, new AsyncCallback() {
                     public void done(boolean doneSync) {
                         log.trace("Redelivering exchangeId: {} done sync: {}", 
exchange.getExchangeId(), doneSync);
 
@@ -147,7 +147,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
             } else {
                 // this redelivery task was scheduled from asynchronous, which 
means we should only
                 // handle when the asynchronous task was done
-                sync = AsyncProcessorHelper.process(outputAsync, exchange, new 
AsyncCallback() {
+                sync = outputAsync.process(exchange, new AsyncCallback() {
                     public void done(boolean doneSync) {
                         log.trace("Redelivering exchangeId: {} done sync: {}", 
exchange.getExchangeId(), doneSync);
 
@@ -388,7 +388,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
             }
 
             // process the exchange (also redelivery)
-            boolean sync = AsyncProcessorHelper.process(outputAsync, exchange, 
new AsyncCallback() {
+            boolean sync = outputAsync.process(exchange, new AsyncCallback() {
                 public void done(boolean sync) {
                     // this callback should only handle the async case
                     if (sync) {
@@ -483,12 +483,15 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
             boolean deliver = true;
 
             // the unit of work may have an optional callback associated we 
need to leverage
-            SubUnitOfWorkCallback uowCallback = 
exchange.getUnitOfWork().getSubUnitOfWorkCallback();
-            if (uowCallback != null) {
-                // signal to the callback we are exhausted
-                uowCallback.onExhausted(exchange);
-                // do not deliver to the failure processor as its been handled 
by the callback instead
-                deliver = false;
+            UnitOfWork uow = exchange.getUnitOfWork();
+            if (uow != null) {
+                SubUnitOfWorkCallback uowCallback = 
uow.getSubUnitOfWorkCallback();
+                if (uowCallback != null) {
+                    // signal to the callback we are exhausted
+                    uowCallback.onExhausted(exchange);
+                    // do not deliver to the failure processor as its been 
handled by the callback instead
+                    deliver = false;
+                }
             }
 
             if (deliver) {
@@ -825,7 +828,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
 
             // the failure processor could also be asynchronous
             AsyncProcessor afp = 
AsyncProcessorConverterHelper.convert(processor);
-            sync = AsyncProcessorHelper.process(afp, exchange, new 
AsyncCallback() {
+            sync = afp.process(exchange, new AsyncCallback() {
                 public void done(boolean sync) {
                     log.trace("Failure processor done: {} processing Exchange: 
{}", processor, exchange);
                     try {

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
index 6a9d98e..aa6eff2 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.UnitOfWork;
  * This processor tracks the current {@link RouteContext} while processing the 
{@link Exchange}.
  * This ensures that the {@link Exchange} have details under which route its 
being currently processed.
  */
+@Deprecated
 public class RouteContextProcessor extends DelegateAsyncProcessor {
 
     private final RouteContext routeContext;

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
index d236e96..42da65c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
@@ -27,6 +27,7 @@ import org.apache.camel.spi.RouteContext;
  * This processor tracks the current {@link RouteContext} while processing the 
{@link Exchange}.
  * This ensures that the {@link Exchange} have details under which route its 
being currently processed.
  */
+@Deprecated
 public class RouteInflightRepositoryProcessor extends DelegateAsyncProcessor {
     
     private final InflightRepository inflightRepository;

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java
index 8ca0bf3..37d7ffb 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
+@Deprecated
 public class RoutePolicyProcessor extends DelegateAsyncProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RoutePolicyProcessor.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
index 26226fe..183cfa0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
@@ -36,6 +36,11 @@ import org.slf4j.LoggerFactory;
  * Threads processor that leverage a thread pool for continue processing the 
{@link Exchange}s
  * using the asynchronous routing engine.
  * <p/>
+ * <b>Notice:</b> For transacted routes then this {@link ThreadsProcessor} is 
not in use, as we want to
+ * process messages using the same thread to support all work done in the same 
transaction. The reason
+ * is that the transaction manager that orchestrate the transaction, requires 
all the work to be done
+ * on the same thread.
+ * <p/>
  * Pay attention to how this processor handles rejected tasks.
  * <ul>
  * <li>Abort - The current exchange will be set with a {@link 
RejectedExecutionException} exception,
@@ -120,6 +125,14 @@ public class ThreadsProcessor extends ServiceSupport 
implements AsyncProcessor {
             throw new IllegalStateException("ThreadsProcessor is not 
running.");
         }
 
+        // we cannot execute this asynchronously for transacted exchanges, as 
the transaction manager doesn't support
+        // using different threads in the same transaction
+        if (exchange.isTransacted()) {
+            LOG.trace("Transacted Exchange must be routed synchronously for 
exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+            callback.done(true);
+            return true;
+        }
+
         ProcessCall call = new ProcessCall(exchange, callback);
         try {
             LOG.trace("Submitting task {}", call);

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
index a4ba407..0e37fab 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
@@ -82,6 +82,7 @@ public class UnitOfWorkProcessor extends 
DelegateAsyncProcessor {
         // if a route context has been configured, then wrap the processor 
with a
         // RouteContextProcessor to ensure we track the route context properly 
during
         // processing of the exchange, but only do this once
+        // TODO: This can possible be removed!
         if (routeContext != null && (!(processor instanceof 
RouteContextProcessor))) {
             processor = new RouteContextProcessor(routeContext, processor);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
index 5c156b0..231628f 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
@@ -75,6 +75,14 @@ public class BacklogTracer extends ServiceSupport implements 
InterceptStrategy {
         this.camelContext = camelContext;
     }
 
+    public void addDefinition(ProcessorDefinition<?> definition) {
+        processors.add(definition);
+    }
+
+    public Queue<DefaultBacklogTracerEventMessage> getQueue() {
+        return queue;
+    }
+
     @Override
     public Processor wrapProcessorInInterceptors(CamelContext context, 
ProcessorDefinition<?> definition, Processor target, Processor nextTarget) 
throws Exception {
         // is this the first output from a route, as we want to know this so 
we can do special logic in first
@@ -317,7 +325,7 @@ public class BacklogTracer extends ServiceSupport 
implements InterceptStrategy {
         queue.clear();
     }
 
-    long incrementTraceCounter() {
+    public long incrementTraceCounter() {
         return traceCounter.incrementAndGet();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java
index 9a02f36..c302b52 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java
@@ -29,6 +29,7 @@ import org.apache.camel.util.MessageHelper;
 /**
  * An interceptor for tracing messages by using the {@link BacklogTracer}.
  */
+@Deprecated
 public class BacklogTracerInterceptor extends DelegateAsyncProcessor {
 
     private final Queue<DefaultBacklogTracerEventMessage> queue;

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index b5a99a1..aecad64 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -32,15 +32,16 @@ import org.apache.camel.Processor;
 import org.apache.camel.Service;
 import org.apache.camel.model.ModelChannel;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.InterceptorToAsyncProcessorBridge;
-import org.apache.camel.processor.RouteContextProcessor;
 import org.apache.camel.processor.WrapProcessor;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.OrderedComparator;
 import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
@@ -72,7 +73,7 @@ public class DefaultChannel extends ServiceSupport implements 
ModelChannel {
     private ProcessorDefinition<?> childDefinition;
     private CamelContext camelContext;
     private RouteContext routeContext;
-    private RouteContextProcessor routeContextProcessor;
+    private CamelInternalProcessor internalProcessor;
 
     public List<Processor> next() {
         List<Processor> answer = new ArrayList<Processor>(1);
@@ -148,20 +149,23 @@ public class DefaultChannel extends ServiceSupport 
implements ModelChannel {
 
     @Override
     protected void doStart() throws Exception {
-        // create route context processor to wrap output
-        routeContextProcessor = new RouteContextProcessor(routeContext, 
getOutput());
-        ServiceHelper.startServices(errorHandler, output, 
routeContextProcessor);
+        // the output has now been created, so assign the output to the 
internal processor
+        internalProcessor.setProcessor(getOutput());
+        ServiceHelper.startServices(errorHandler, output, internalProcessor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(output, errorHandler, 
routeContextProcessor);
+        ServiceHelper.stopServices(output, errorHandler, internalProcessor);
     }
 
     public void initChannel(ProcessorDefinition<?> outputDefinition, 
RouteContext routeContext) throws Exception {
         this.routeContext = routeContext;
         this.definition = outputDefinition;
         this.camelContext = routeContext.getCamelContext();
+        this.internalProcessor = new CamelInternalProcessor();
+        // TODO: The route context task can likely be only added in 
DefaultRouteContext once per route
+        this.internalProcessor.addTask(new 
CamelInternalProcessor.RouteContextTask(routeContext));
 
         Processor target = nextProcessor;
         Processor next;
@@ -194,8 +198,22 @@ public class DefaultChannel extends ServiceSupport 
implements ModelChannel {
         }
 
         // then wrap the output with the backlog and tracer (backlog first, as 
we do not want regular tracer to tracer the backlog)
-        BacklogTracerInterceptor backlog = (BacklogTracerInterceptor) 
getOrCreateBacklogTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(),
 targetOutputDef, target, null);
-        TraceInterceptor trace = (TraceInterceptor) 
getOrCreateTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), 
targetOutputDef, backlog, null);
+        InterceptStrategy tracer = getOrCreateBacklogTracer();
+        if (tracer instanceof BacklogTracer) {
+            BacklogTracer backlogTracer = (BacklogTracer) tracer;
+            backlogTracer.addDefinition(targetOutputDef);
+
+            RouteDefinition route = 
ProcessorDefinitionHelper.getRoute(definition);
+            boolean first = false;
+            if (route != null && !route.getOutputs().isEmpty()) {
+                first = route.getOutputs().get(0) == definition;
+            }
+
+            internalProcessor.addTask(new 
CamelInternalProcessor.BacklogTracerTask(backlogTracer.getQueue(), 
backlogTracer, targetOutputDef, route, first));
+        }
+
+        // TODO: trace interceptor can be a task on internalProcessor
+        TraceInterceptor trace = (TraceInterceptor) 
getOrCreateTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), 
targetOutputDef, target, null);
         // trace interceptor need to have a reference to route context so we 
at runtime can enable/disable tracing on-the-fly
         trace.setRouteContext(routeContext);
         target = trace;
@@ -323,16 +341,13 @@ public class DefaultChannel extends ServiceSupport 
implements ModelChannel {
     }
 
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
-        Processor processor = getOutput();
-        if (processor == null || !continueProcessing(exchange)) {
+        if (!continueProcessing(exchange)) {
             // we should not continue routing so we are done
             callback.done(true);
             return true;
         }
 
-        // process the exchange using the route context processor
-        ObjectHelper.notNull(routeContextProcessor, "RouteContextProcessor", 
this);
-        return routeContextProcessor.process(exchange, callback);
+        return internalProcessor.process(exchange, callback);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
index 42f42e3..0875dae 100644
--- a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
@@ -46,7 +46,10 @@ public final class AsyncProcessorHelper {
      * @param callback  the callback
      * @return <tt>true</tt> to continue execute synchronously, <tt>false</tt> 
to continue being executed asynchronously
      */
+    @Deprecated
     public static boolean process(final AsyncProcessor processor, final 
Exchange exchange, final AsyncCallback callback) {
+        // TODO: This method is no longer needed, and we can avoid using it
+
         boolean sync;
 
         if (exchange.isTransacted()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
new file mode 100644
index 0000000..3edebe4
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Scanner;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class ReduceStacksNeededDuringRoutingTest extends ContextTestSupport {
+
+    public void testReduceStacksNeeded() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                        .to("log:foo")
+                        .to("log:bar")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                try {
+                                    throw new IllegalArgumentException("Forced 
to dump stacktrace");
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+
+                                    StringWriter sw = new StringWriter();
+                                    PrintWriter pw = new PrintWriter(sw);
+                                    e.printStackTrace(pw);
+
+                                    String s = sw.toString();
+                                    Scanner scanner = new Scanner(s);
+                                    scanner.useDelimiter("\n");
+                                    int count = 0;
+                                    while (scanner.hasNext()) {
+                                        scanner.next();
+                                        count++;
+                                    }
+                                    System.out.println("There is " + count + " 
lines in the stacktrace");
+                                }
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java
 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java
index 3c3d683..0b775f4 100644
--- 
a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java
+++ 
b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java
@@ -28,22 +28,22 @@ public class 
TransactionalClientDataSourceTransactedWithLocalOnExceptionTest ext
         return new SpringRouteBuilder() {
             public void configure() throws Exception {
                 from("direct:okay")
-                    .transacted()
                     // use local on exception
                     .onException(IllegalArgumentException.class)
                         .handled(false)
                         .to("mock:error")
                     .end()
+                    .transacted()
                     .setBody(constant("Tiger in 
Action")).beanRef("bookService")
                     .setBody(constant("Elephant in 
Action")).beanRef("bookService");
 
                 from("direct:fail")
-                    .transacted()
                     // use local on exception
                     .onException(IllegalArgumentException.class)
                         .handled(false)
                         .to("mock:error")
                     .end()
+                    .transacted()
                     .setBody(constant("Tiger in 
Action")).beanRef("bookService")
                     .setBody(constant("Donkey in 
Action")).beanRef("bookService");
             }

Reply via email to