Updated Branches: refs/heads/master e2b22608c -> 3671004c6
CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fb15fdee Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fb15fdee Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fb15fdee Branch: refs/heads/master Commit: fb15fdee4b48d1cc9f8e8db6abf4048da4558308 Parents: e2b2260 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat May 18 16:48:25 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat May 18 16:48:25 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/seda/SedaConsumer.java | 5 +- .../org/apache/camel/impl/DefaultRouteContext.java | 27 +- .../camel/management/InstrumentationProcessor.java | 1 + .../java/org/apache/camel/model/ModelChannel.java | 1 + .../apache/camel/model/TransactedDefinition.java | 1 + .../camel/processor/CamelInternalProcessor.java | 412 +++++++++++++++ .../processor/CamelInternalProcessorTask.java | 49 ++ .../camel/processor/DelegateAsyncProcessor.java | 2 +- .../java/org/apache/camel/processor/Pipeline.java | 3 +- .../camel/processor/RedeliveryErrorHandler.java | 23 +- .../camel/processor/RouteContextProcessor.java | 1 + .../RouteInflightRepositoryProcessor.java | 1 + .../camel/processor/RoutePolicyProcessor.java | 1 + .../apache/camel/processor/ThreadsProcessor.java | 13 + .../camel/processor/UnitOfWorkProcessor.java | 1 + .../camel/processor/interceptor/BacklogTracer.java | 10 +- .../interceptor/BacklogTracerInterceptor.java | 1 + .../processor/interceptor/DefaultChannel.java | 43 +- .../apache/camel/util/AsyncProcessorHelper.java | 3 + .../ReduceStacksNeededDuringRoutingTest.java | 79 +++ ...taSourceTransactedWithLocalOnExceptionTest.java | 4 +- 21 files changed, 634 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index 1b7f42b..ac26109 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -38,7 +38,6 @@ import org.apache.camel.spi.ShutdownAware; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.UnitOfWorkHelper; @@ -266,7 +265,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this); // and use the asynchronous routing engine to support it - AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() { + mp.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // done the uow on the completions UnitOfWorkHelper.doneSynchronizations(exchange, completions, LOG); @@ -274,7 +273,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, }); } else { // use the regular processor and use the asynchronous routing engine to support it - AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { + processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // noop } http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java index 460a2d4..0531e6c 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java @@ -34,9 +34,8 @@ import org.apache.camel.management.InstrumentationProcessor; import org.apache.camel.model.FromDefinition; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.Pipeline; -import org.apache.camel.processor.RouteInflightRepositoryProcessor; -import org.apache.camel.processor.RoutePolicyProcessor; import org.apache.camel.processor.UnitOfWorkProcessor; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.RouteContext; @@ -154,8 +153,9 @@ public class DefaultRouteContext implements RouteContext { // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW UnitOfWorkProcessor unitOfWorkProcessor = new UnitOfWorkProcessor(this, target); + CamelInternalProcessor internal = new CamelInternalProcessor(unitOfWorkProcessor); + // and then optionally add route policy processor if a custom policy is set - RoutePolicyProcessor routePolicyProcessor = null; List<RoutePolicy> routePolicyList = getRoutePolicyList(); if (routePolicyList != null && !routePolicyList.isEmpty()) { for (RoutePolicy policy : routePolicyList) { @@ -169,25 +169,23 @@ public class DefaultRouteContext implements RouteContext { } } } - routePolicyProcessor = new RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList); - target = routePolicyProcessor; - } else { - target = unitOfWorkProcessor; + + internal.addTask(new CamelInternalProcessor.RoutePolicyTask(routePolicyList)); } // wrap in route inflight processor to track number of inflight exchanges for the route - RouteInflightRepositoryProcessor inflight = new RouteInflightRepositoryProcessor(camelContext.getInflightRepository(), target); + String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); + internal.addTask(new CamelInternalProcessor.RouteInflightRepositoryTask(camelContext.getInflightRepository(), routeId)); + // TODO: This should be a task as well // and wrap it by a instrumentation processor that is to be used for performance stats // for this particular route InstrumentationProcessor instrument = new InstrumentationProcessor(); instrument.setType("route"); - instrument.setProcessor(inflight); + instrument.setProcessor(internal); // and create the route that wraps the UoW Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), instrument); - // create the route id - String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); edcr.getProperties().put(Route.ID_PROPERTY, routeId); edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode())); if (route.getGroup() != null) { @@ -195,11 +193,10 @@ public class DefaultRouteContext implements RouteContext { } // after the route is created then set the route on the policy processor so we get hold of it - if (routePolicyProcessor != null) { - routePolicyProcessor.setRoute(edcr); + CamelInternalProcessor.RoutePolicyTask task = internal.getTask(CamelInternalProcessor.RoutePolicyTask.class); + if (task != null) { + task.setRoute(edcr); } - // after the route is created then set the route on the inflight processor so we get hold of it - inflight.setRoute(edcr); // invoke init on route policy if (routePolicyList != null && !routePolicyList.isEmpty()) { http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java index a406cf7..18e6dd6 100644 --- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; * * @version */ +@Deprecated public class InstrumentationProcessor extends DelegateAsyncProcessor { private static final transient Logger LOG = LoggerFactory.getLogger(InstrumentationProcessor.class); http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java b/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java index 82f4d52..519558d 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java +++ b/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java @@ -20,6 +20,7 @@ import org.apache.camel.Channel; import org.apache.camel.spi.RouteContext; public interface ModelChannel extends Channel { + /** * Initializes the channel. * http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java b/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java index 81632f7..5a52b56 100644 --- a/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/TransactedDefinition.java @@ -147,6 +147,7 @@ public class TransactedDefinition extends OutputDefinition<TransactedDefinition> // wrap Processor target = policy.wrap(routeContext, childProcessor); + // TODO: should not be needed as its already a service // wrap the target so it becomes a service and we can manage its lifecycle WrapProcessor wrap = new WrapProcessor(target, childProcessor); return wrap; http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java new file mode 100644 index 0000000..52c6807 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -0,0 +1,412 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Queue; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.StatefulService; +import org.apache.camel.api.management.PerformanceCounter; +import org.apache.camel.management.DelegatePerformanceCounter; +import org.apache.camel.management.mbean.ManagedPerformanceCounter; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.interceptor.BacklogTracer; +import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage; +import org.apache.camel.spi.InflightRepository; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RoutePolicy; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.util.MessageHelper; +import org.apache.camel.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as: + * <ul> + * <li>Keeping track which route currently is being routed</li> + * <li>Gather JMX performance statics</li> + * <li>Tracing the routing using</li> + * <li>Execute {@link RoutePolicy}</li> + * </ul> + * ... and much more. + * <p/> + * This implementation executes this cross cutting functionality as a {@link CamelInternalProcessorTask} task + * by executing the {@link CamelInternalProcessorTask#before(org.apache.camel.Exchange)} and + * {@link CamelInternalProcessorTask#after(org.apache.camel.Exchange, Object)} callbacks in correct order during routing. + * This reduces number of stack frames needed during routing, and reduce the number of lines in stacktraces, as well + * makes debugging the routing engine easier for end users. + */ +public final class CamelInternalProcessor extends DelegateAsyncProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class); + private final List<CamelInternalProcessorTask> tasks = new ArrayList<CamelInternalProcessorTask>(); + + public CamelInternalProcessor() { + } + + public CamelInternalProcessor(Processor processor) { + super(processor); + } + + public CamelInternalProcessor(AsyncProcessor processor) { + super(processor); + } + + public void addTask(CamelInternalProcessorTask task) { + tasks.add(task); + } + + public void addFirstTask(CamelInternalProcessorTask task) { + tasks.add(0, task); + } + + public <T> T getTask(Class<T> type) { + for (CamelInternalProcessorTask task : tasks) { + if (type.isInstance(task)) { + return type.cast(task); + } + } + return null; + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + if (processor == null) { + // no processor then we are done + callback.done(true); + return true; + } + + final List<Object> states = new ArrayList<Object>(tasks.size()); + for (CamelInternalProcessorTask task : tasks) { + try { + Object state = task.before(exchange); + states.add(state); + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; + } + } + + callback = new InternalCallback(states, exchange, callback); + + if (exchange.isTransacted()) { + // must be synchronized for transacted exchanges + LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); + try { + processor.process(exchange); + } catch (Throwable e) { + exchange.setException(e); + } + callback.done(true); + return true; + } else { + final UnitOfWork uow = exchange.getUnitOfWork(); + + // allow unit of work to wrap callback in case it need to do some special work + // for example the MDCUnitOfWork + AsyncCallback async = callback; + if (uow != null) { + async = uow.beforeProcess(processor, exchange, callback); + } + + boolean sync = processor.process(exchange, async); + + // execute any after processor work (in current thread, not in the callback) + if (uow != null) { + uow.afterProcess(processor, exchange, callback, sync); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", + new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange}); + } + return sync; + } + } + + private final class InternalCallback implements AsyncCallback { + + private final List<Object> states; + private final Exchange exchange; + private final AsyncCallback callback; + + private InternalCallback(List<Object> states, Exchange exchange, AsyncCallback callback) { + this.states = states; + this.exchange = exchange; + this.callback = callback; + } + + @Override + public void done(boolean doneSync) { + // we should call after in reverse order + for (int i = tasks.size() - 1; i >= 0; i--) { + CamelInternalProcessorTask task = tasks.get(i); + Object state = states.get(i); + try { + task.after(exchange, state); + } catch (Throwable e) { + exchange.setException(e); + break; + } + } + callback.done(doneSync); + } + } + + public static class InstrumentationTask implements CamelInternalProcessorTask<StopWatch> { + + private PerformanceCounter counter; + private String type; + + public void setCounter(Object counter) { + ManagedPerformanceCounter mpc = null; + if (counter instanceof ManagedPerformanceCounter) { + mpc = (ManagedPerformanceCounter) counter; + } + + if (this.counter instanceof DelegatePerformanceCounter) { + ((DelegatePerformanceCounter) this.counter).setCounter(mpc); + } else if (mpc != null) { + this.counter = mpc; + } else if (counter instanceof PerformanceCounter) { + this.counter = (PerformanceCounter) counter; + } + } + + protected void recordTime(Exchange exchange, long duration) { + if (LOG.isTraceEnabled()) { + LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange}); + } + + if (!exchange.isFailed() && exchange.getException() == null) { + counter.completedExchange(exchange, duration); + } else { + counter.failedExchange(exchange); + } + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @Override + public StopWatch before(Exchange exchange) throws Exception { + // only record time if stats is enabled + return (counter != null && counter.isStatisticsEnabled()) ? new StopWatch() : null; + } + + @Override + public void after(Exchange exchange, StopWatch watch) throws Exception { + // record end time + if (watch != null) { + recordTime(exchange, watch.stop()); + } + } + } + + public static class RouteContextTask implements CamelInternalProcessorTask<UnitOfWork> { + + private final RouteContext routeContext; + + public RouteContextTask(RouteContext routeContext) { + this.routeContext = routeContext; + } + + @Override + public UnitOfWork before(Exchange exchange) throws Exception { + // push the current route context + final UnitOfWork unitOfWork = exchange.getUnitOfWork(); + if (unitOfWork != null) { + unitOfWork.pushRouteContext(routeContext); + } + return unitOfWork; + } + + @Override + public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { + if (unitOfWork != null) { + unitOfWork.popRouteContext(); + } + } + } + + public static class RouteInflightRepositoryTask implements CamelInternalProcessorTask { + + private final InflightRepository inflightRepository; + private final String id; + + public RouteInflightRepositoryTask(InflightRepository inflightRepository, String id) { + this.inflightRepository = inflightRepository; + this.id = id; + } + + @Override + public Object before(Exchange exchange) throws Exception { + inflightRepository.add(exchange, id); + return null; + } + + @Override + public void after(Exchange exchange, Object state) throws Exception { + inflightRepository.remove(exchange, id); + } + } + + public static class RoutePolicyTask implements CamelInternalProcessorTask { + + private final List<RoutePolicy> routePolicies; + private Route route; + + public RoutePolicyTask(List<RoutePolicy> routePolicies) { + this.routePolicies = routePolicies; + } + + public void setRoute(Route route) { + this.route = route; + } + + /** + * Strategy to determine if this policy is allowed to run + * + * @param policy the policy + * @return <tt>true</tt> to run + */ + protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) { + if (policy instanceof StatefulService) { + StatefulService ss = (StatefulService) policy; + return ss.isRunAllowed(); + } + return true; + } + + @Override + public Object before(Exchange exchange) throws Exception { + // invoke begin + for (RoutePolicy policy : routePolicies) { + try { + if (isRoutePolicyRunAllowed(policy)) { + policy.onExchangeBegin(route, exchange); + } + } catch (Exception e) { + LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy + + ". This exception will be ignored", e); + } + } + return null; + } + + @Override + public void after(Exchange exchange, Object data) throws Exception { + // do not invoke it if Camel is stopping as we don't want + // the policy to start a consumer during Camel is stopping + if (isCamelStopping(exchange.getContext())) { + return; + } + + for (RoutePolicy policy : routePolicies) { + try { + if (isRoutePolicyRunAllowed(policy)) { + policy.onExchangeDone(route, exchange); + } + } catch (Exception e) { + LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy + + ". This exception will be ignored", e); + } + } + } + + private static boolean isCamelStopping(CamelContext context) { + if (context instanceof StatefulService) { + StatefulService ss = (StatefulService) context; + return ss.isStopping() || ss.isStopped(); + } + return false; + } + } + + public static final class BacklogTracerTask implements CamelInternalProcessorTask { + + private final Queue<DefaultBacklogTracerEventMessage> queue; + private final BacklogTracer backlogTracer; + private final ProcessorDefinition<?> processorDefinition; + private final ProcessorDefinition<?> routeDefinition; + private final boolean first; + + public BacklogTracerTask(Queue<DefaultBacklogTracerEventMessage> queue, BacklogTracer backlogTracer, + ProcessorDefinition<?> processorDefinition, ProcessorDefinition<?> routeDefinition, boolean first) { + this.queue = queue; + this.backlogTracer = backlogTracer; + this.processorDefinition = processorDefinition; + this.routeDefinition = routeDefinition; + this.first = first; + } + + @Override + public Object before(Exchange exchange) throws Exception { + if (backlogTracer.shouldTrace(processorDefinition, exchange)) { + // ensure there is space on the queue + int drain = queue.size() - backlogTracer.getBacklogSize(); + // and we need room for ourselves and possible also a first pseudo message as well + drain += first ? 2 : 1; + if (drain > 0) { + for (int i = 0; i < drain; i++) { + queue.poll(); + } + } + + Date timestamp = new Date(); + String toNode = processorDefinition.getId(); + String exchangeId = exchange.getExchangeId(); + String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4, + backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars()); + + // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route) + String routeId = routeDefinition.getId(); + if (first) { + Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class); + DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml); + queue.add(pseudo); + } + DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml); + queue.add(event); + } + + return null; + } + + @Override + public void after(Exchange exchange, Object data) throws Exception { + // noop + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessorTask.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessorTask.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessorTask.java new file mode 100644 index 0000000..1df3db9 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessorTask.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.Exchange; + +/** + * A task to execute cross cutting functionality in the Camel routing engine. + * <p/> + * The Camel routing engine will execute the {@link #before(org.apache.camel.Exchange)} and + * {@link #after(org.apache.camel.Exchange, Object)} methods during routing in correct order. + * + * @param <T> + * @see CamelInternalProcessor + */ +public interface CamelInternalProcessorTask<T> { + + /** + * Callback executed before processing a step in the route. + * + * @param exchange the current exchange + * @return any state to keep and provide as data to the {@link #after(org.apache.camel.Exchange, Object)} method, or use <tt>null</tt> for no state. + * @throws Exception is thrown if error during the call. + */ + T before(Exchange exchange) throws Exception; + + /** + * Callback executed after processing a step in the route. + * + * @param exchange the current exchange + * @param data the state, if any, returned in the {@link #before(org.apache.camel.Exchange)} method. + * @throws Exception is thrown if error during the call. + */ + void after(Exchange exchange, T data) throws Exception; +} http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java index 716505b..a49dff4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java @@ -96,7 +96,7 @@ public class DelegateAsyncProcessor extends ServiceSupport implements DelegatePr callback.done(true); return true; } - return AsyncProcessorHelper.process(processor, exchange, callback); + return processor.process(exchange, callback); } public boolean hasNext() { http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java index 3ace4c5..2850cc6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java @@ -114,7 +114,8 @@ public class Pipeline extends MulticastProcessor implements AsyncProcessor, Trac // implement asynchronous routing logic in callback so we can have the callback being // triggered and then continue routing where we left - boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, new AsyncCallback() { + //boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, + boolean sync = asyncProcessor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the pipeline if (doneSync) { http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index f34b467..02b56a8 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -126,7 +126,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from // this error handler, which means we have to invoke the callback with false, to have the callback // be notified when we are done - sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { + sync = outputAsync.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); @@ -147,7 +147,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme } else { // this redelivery task was scheduled from asynchronous, which means we should only // handle when the asynchronous task was done - sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { + sync = outputAsync.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); @@ -388,7 +388,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme } // process the exchange (also redelivery) - boolean sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { + boolean sync = outputAsync.process(exchange, new AsyncCallback() { public void done(boolean sync) { // this callback should only handle the async case if (sync) { @@ -483,12 +483,15 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme boolean deliver = true; // the unit of work may have an optional callback associated we need to leverage - SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); - if (uowCallback != null) { - // signal to the callback we are exhausted - uowCallback.onExhausted(exchange); - // do not deliver to the failure processor as its been handled by the callback instead - deliver = false; + UnitOfWork uow = exchange.getUnitOfWork(); + if (uow != null) { + SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback(); + if (uowCallback != null) { + // signal to the callback we are exhausted + uowCallback.onExhausted(exchange); + // do not deliver to the failure processor as its been handled by the callback instead + deliver = false; + } } if (deliver) { @@ -825,7 +828,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // the failure processor could also be asynchronous AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor); - sync = AsyncProcessorHelper.process(afp, exchange, new AsyncCallback() { + sync = afp.process(exchange, new AsyncCallback() { public void done(boolean sync) { log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); try { http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java index 6a9d98e..aa6eff2 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java @@ -26,6 +26,7 @@ import org.apache.camel.spi.UnitOfWork; * This processor tracks the current {@link RouteContext} while processing the {@link Exchange}. * This ensures that the {@link Exchange} have details under which route its being currently processed. */ +@Deprecated public class RouteContextProcessor extends DelegateAsyncProcessor { private final RouteContext routeContext; http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java index d236e96..42da65c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java @@ -27,6 +27,7 @@ import org.apache.camel.spi.RouteContext; * This processor tracks the current {@link RouteContext} while processing the {@link Exchange}. * This ensures that the {@link Exchange} have details under which route its being currently processed. */ +@Deprecated public class RouteInflightRepositoryProcessor extends DelegateAsyncProcessor { private final InflightRepository inflightRepository; http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java index 8ca0bf3..37d7ffb 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; * * @version */ +@Deprecated public class RoutePolicyProcessor extends DelegateAsyncProcessor { private static final Logger LOG = LoggerFactory.getLogger(RoutePolicyProcessor.class); http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java index 26226fe..183cfa0 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java @@ -36,6 +36,11 @@ import org.slf4j.LoggerFactory; * Threads processor that leverage a thread pool for continue processing the {@link Exchange}s * using the asynchronous routing engine. * <p/> + * <b>Notice:</b> For transacted routes then this {@link ThreadsProcessor} is not in use, as we want to + * process messages using the same thread to support all work done in the same transaction. The reason + * is that the transaction manager that orchestrate the transaction, requires all the work to be done + * on the same thread. + * <p/> * Pay attention to how this processor handles rejected tasks. * <ul> * <li>Abort - The current exchange will be set with a {@link RejectedExecutionException} exception, @@ -120,6 +125,14 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor { throw new IllegalStateException("ThreadsProcessor is not running."); } + // we cannot execute this asynchronously for transacted exchanges, as the transaction manager doesn't support + // using different threads in the same transaction + if (exchange.isTransacted()) { + LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); + callback.done(true); + return true; + } + ProcessCall call = new ProcessCall(exchange, callback); try { LOG.trace("Submitting task {}", call); http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java index a4ba407..0e37fab 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java @@ -82,6 +82,7 @@ public class UnitOfWorkProcessor extends DelegateAsyncProcessor { // if a route context has been configured, then wrap the processor with a // RouteContextProcessor to ensure we track the route context properly during // processing of the exchange, but only do this once + // TODO: This can possible be removed! if (routeContext != null && (!(processor instanceof RouteContextProcessor))) { processor = new RouteContextProcessor(routeContext, processor); } http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java index 5c156b0..231628f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java @@ -75,6 +75,14 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy { this.camelContext = camelContext; } + public void addDefinition(ProcessorDefinition<?> definition) { + processors.add(definition); + } + + public Queue<DefaultBacklogTracerEventMessage> getQueue() { + return queue; + } + @Override public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception { // is this the first output from a route, as we want to know this so we can do special logic in first @@ -317,7 +325,7 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy { queue.clear(); } - long incrementTraceCounter() { + public long incrementTraceCounter() { return traceCounter.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java index 9a02f36..c302b52 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracerInterceptor.java @@ -29,6 +29,7 @@ import org.apache.camel.util.MessageHelper; /** * An interceptor for tracing messages by using the {@link BacklogTracer}. */ +@Deprecated public class BacklogTracerInterceptor extends DelegateAsyncProcessor { private final Queue<DefaultBacklogTracerEventMessage> queue; http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index b5a99a1..aecad64 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -32,15 +32,16 @@ import org.apache.camel.Processor; import org.apache.camel.Service; import org.apache.camel.model.ModelChannel; import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ProcessorDefinitionHelper; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.InterceptorToAsyncProcessorBridge; -import org.apache.camel.processor.RouteContextProcessor; import org.apache.camel.processor.WrapProcessor; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.RouteContext; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; -import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.OrderedComparator; import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; @@ -72,7 +73,7 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { private ProcessorDefinition<?> childDefinition; private CamelContext camelContext; private RouteContext routeContext; - private RouteContextProcessor routeContextProcessor; + private CamelInternalProcessor internalProcessor; public List<Processor> next() { List<Processor> answer = new ArrayList<Processor>(1); @@ -148,20 +149,23 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { @Override protected void doStart() throws Exception { - // create route context processor to wrap output - routeContextProcessor = new RouteContextProcessor(routeContext, getOutput()); - ServiceHelper.startServices(errorHandler, output, routeContextProcessor); + // the output has now been created, so assign the output to the internal processor + internalProcessor.setProcessor(getOutput()); + ServiceHelper.startServices(errorHandler, output, internalProcessor); } @Override protected void doStop() throws Exception { - ServiceHelper.stopServices(output, errorHandler, routeContextProcessor); + ServiceHelper.stopServices(output, errorHandler, internalProcessor); } public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { this.routeContext = routeContext; this.definition = outputDefinition; this.camelContext = routeContext.getCamelContext(); + this.internalProcessor = new CamelInternalProcessor(); + // TODO: The route context task can likely be only added in DefaultRouteContext once per route + this.internalProcessor.addTask(new CamelInternalProcessor.RouteContextTask(routeContext)); Processor target = nextProcessor; Processor next; @@ -194,8 +198,22 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { } // then wrap the output with the backlog and tracer (backlog first, as we do not want regular tracer to tracer the backlog) - BacklogTracerInterceptor backlog = (BacklogTracerInterceptor) getOrCreateBacklogTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null); - TraceInterceptor trace = (TraceInterceptor) getOrCreateTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, backlog, null); + InterceptStrategy tracer = getOrCreateBacklogTracer(); + if (tracer instanceof BacklogTracer) { + BacklogTracer backlogTracer = (BacklogTracer) tracer; + backlogTracer.addDefinition(targetOutputDef); + + RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition); + boolean first = false; + if (route != null && !route.getOutputs().isEmpty()) { + first = route.getOutputs().get(0) == definition; + } + + internalProcessor.addTask(new CamelInternalProcessor.BacklogTracerTask(backlogTracer.getQueue(), backlogTracer, targetOutputDef, route, first)); + } + + // TODO: trace interceptor can be a task on internalProcessor + TraceInterceptor trace = (TraceInterceptor) getOrCreateTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null); // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly trace.setRouteContext(routeContext); target = trace; @@ -323,16 +341,13 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { } public boolean process(final Exchange exchange, final AsyncCallback callback) { - Processor processor = getOutput(); - if (processor == null || !continueProcessing(exchange)) { + if (!continueProcessing(exchange)) { // we should not continue routing so we are done callback.done(true); return true; } - // process the exchange using the route context processor - ObjectHelper.notNull(routeContextProcessor, "RouteContextProcessor", this); - return routeContextProcessor.process(exchange, callback); + return internalProcessor.process(exchange, callback); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java index 42f42e3..0875dae 100644 --- a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java @@ -46,7 +46,10 @@ public final class AsyncProcessorHelper { * @param callback the callback * @return <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously */ + @Deprecated public static boolean process(final AsyncProcessor processor, final Exchange exchange, final AsyncCallback callback) { + // TODO: This method is no longer needed, and we can avoid using it + boolean sync; if (exchange.isTransacted()) { http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java new file mode 100644 index 0000000..3edebe4 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Scanner; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class ReduceStacksNeededDuringRoutingTest extends ContextTestSupport { + + public void testReduceStacksNeeded() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + template.sendBody("seda:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:start") + .to("log:foo") + .to("log:bar") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + try { + throw new IllegalArgumentException("Forced to dump stacktrace"); + } catch (Exception e) { + e.printStackTrace(); + + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + + String s = sw.toString(); + Scanner scanner = new Scanner(s); + scanner.useDelimiter("\n"); + int count = 0; + while (scanner.hasNext()) { + scanner.next(); + count++; + } + System.out.println("There is " + count + " lines in the stacktrace"); + } + } + }) + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/fb15fdee/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java index 3c3d683..0b775f4 100644 --- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithLocalOnExceptionTest.java @@ -28,22 +28,22 @@ public class TransactionalClientDataSourceTransactedWithLocalOnExceptionTest ext return new SpringRouteBuilder() { public void configure() throws Exception { from("direct:okay") - .transacted() // use local on exception .onException(IllegalArgumentException.class) .handled(false) .to("mock:error") .end() + .transacted() .setBody(constant("Tiger in Action")).beanRef("bookService") .setBody(constant("Elephant in Action")).beanRef("bookService"); from("direct:fail") - .transacted() // use local on exception .onException(IllegalArgumentException.class) .handled(false) .to("mock:error") .end() + .transacted() .setBody(constant("Tiger in Action")).beanRef("bookService") .setBody(constant("Donkey in Action")).beanRef("bookService"); }