CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ede1339 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ede1339 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ede1339 Branch: refs/heads/master Commit: 9ede13394374659a87d313951eef9bdbafd5ff5f Parents: 7bcf843 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon May 20 18:06:16 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue May 21 07:43:38 2013 +0200 ---------------------------------------------------------------------- .../src/main/java/org/apache/camel/Exchange.java | 4 + .../org/apache/camel/impl/DefaultRouteContext.java | 8 +- .../camel/processor/CamelInternalProcessor.java | 96 ++++++++++++++- .../processor/interceptor/DefaultChannel.java | 1 + .../apache/camel/processor/ChoiceWithEndTest.java | 2 - .../apache/camel/processor/NavigateRouteTest.java | 2 +- .../RandomLoadBalanceJavaDSLBuilderTest.java | 16 +-- .../ReduceStacksNeededDuringRoutingTest.java | 3 +- .../apache/camel/processor/SplitWithEndTest.java | 2 - 9 files changed, 109 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/main/java/org/apache/camel/Exchange.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java index 1c2e1bc..5e8a88d 100644 --- a/camel-core/src/main/java/org/apache/camel/Exchange.java +++ b/camel-core/src/main/java/org/apache/camel/Exchange.java @@ -193,6 +193,10 @@ public interface Exchange { String TRANSFER_ENCODING = "Transfer-Encoding"; String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted"; + /** + * @deprecated UNIT_OF_WORK_PROCESS_SYNC is not in use and will be removed in future Camel release + */ + @Deprecated String UNIT_OF_WORK_PROCESS_SYNC = "CamelUnitOfWorkProcessSync"; String XSLT_FILE_NAME = "CamelXsltFileName"; http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java index 0531e6c..07c3235 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java @@ -150,10 +150,13 @@ public class DefaultRouteContext implements RouteContext { if (!eventDrivenProcessors.isEmpty()) { Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors); + String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); + // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW - UnitOfWorkProcessor unitOfWorkProcessor = new UnitOfWorkProcessor(this, target); + //UnitOfWorkProcessor unitOfWorkProcessor = new UnitOfWorkProcessor(this, target); - CamelInternalProcessor internal = new CamelInternalProcessor(unitOfWorkProcessor); + CamelInternalProcessor internal = new CamelInternalProcessor(target); + internal.addTask(new CamelInternalProcessor.UnitOfWorkProcessorTask(routeId)); // and then optionally add route policy processor if a custom policy is set List<RoutePolicy> routePolicyList = getRoutePolicyList(); @@ -174,7 +177,6 @@ public class DefaultRouteContext implements RouteContext { } // wrap in route inflight processor to track number of inflight exchanges for the route - String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); internal.addTask(new CamelInternalProcessor.RouteInflightRepositoryTask(camelContext.getInflightRepository(), routeId)); // TODO: This should be a task as well http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 5e73115..b04d6b9 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -29,6 +29,8 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.StatefulService; import org.apache.camel.api.management.PerformanceCounter; +import org.apache.camel.impl.DefaultUnitOfWork; +import org.apache.camel.impl.MDCUnitOfWork; import org.apache.camel.management.DelegatePerformanceCounter; import org.apache.camel.management.mbean.ManagedPerformanceCounter; import org.apache.camel.model.ProcessorDefinition; @@ -46,10 +48,11 @@ import org.slf4j.LoggerFactory; /** * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as: * <ul> + * <li>Execute {@link UnitOfWork}</li> * <li>Keeping track which route currently is being routed</li> + * <li>Execute {@link RoutePolicy}</li> * <li>Gather JMX performance statics</li> * <li>Tracing</li> - * <li>Execute {@link RoutePolicy}</li> * </ul> * ... and much more. * <p/> @@ -115,9 +118,17 @@ public final class CamelInternalProcessor extends DelegateAsyncProcessor { // create internal callback which will execute the tasks in reverse order when done callback = new InternalCallback(states, exchange, callback); - if (exchange.isTransacted()) { + // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0 + Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC); + if (exchange.isTransacted() || synchronous != null) { // must be synchronized for transacted exchanges - LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + if (exchange.isTransacted()) { + LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); + } else { + LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); + } + } try { processor.process(exchange); } catch (Throwable e) { @@ -135,6 +146,9 @@ public final class CamelInternalProcessor extends DelegateAsyncProcessor { async = uow.beforeProcess(processor, exchange, callback); } + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); + } boolean sync = processor.process(exchange, async); // execute any after processor work (in current thread, not in the callback) @@ -414,4 +428,80 @@ public final class CamelInternalProcessor extends DelegateAsyncProcessor { } } + public static class UnitOfWorkProcessorTask implements CamelInternalProcessorTask<UnitOfWork> { + + private final String routeId; + + public UnitOfWorkProcessorTask(String routeId) { + this.routeId = routeId; + } + + @Override + public UnitOfWork before(Exchange exchange) throws Exception { + // if the exchange doesn't have from route id set, then set it if it originated + // from this unit of work + if (routeId != null && exchange.getFromRouteId() == null) { + exchange.setFromRouteId(routeId); + } + + if (exchange.getUnitOfWork() == null) { + // If there is no existing UoW, then we should start one and + // terminate it once processing is completed for the exchange. + final UnitOfWork uow = createUnitOfWork(exchange); + exchange.setUnitOfWork(uow); + uow.start(); + return uow; + } + + return null; + } + + @Override + public void after(Exchange exchange, UnitOfWork uow) throws Exception { + if (uow != null) { + doneUow(uow, exchange); + } + } + + /** + * Strategy to create the unit of work for the given exchange. + * + * @param exchange the exchange + * @return the created unit of work + */ + protected UnitOfWork createUnitOfWork(Exchange exchange) { + UnitOfWork answer; + if (exchange.getContext().isUseMDCLogging()) { + answer = new MDCUnitOfWork(exchange); + } else { + answer = new DefaultUnitOfWork(exchange); + } + return answer; + } + + private void doneUow(UnitOfWork uow, Exchange exchange) { + // unit of work is done + try { + if (uow != null) { + uow.done(exchange); + } + } catch (Throwable e) { + LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange + + ". This exception will be ignored.", e); + } + try { + if (uow != null) { + uow.stop(); + } + } catch (Throwable e) { + LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange + + ". This exception will be ignored.", e); + } + + // remove uow from exchange as its done + exchange.setUnitOfWork(null); + } + + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index aecad64..69248f0 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -341,6 +341,7 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { } public boolean process(final Exchange exchange, final AsyncCallback callback) { + // TODO: This logic can be in internal processor if (!continueProcessing(exchange)) { // we should not continue routing so we are done callback.done(true); http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java b/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java index 97c37e2..aba0408 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java @@ -34,8 +34,6 @@ public class ChoiceWithEndTest extends ContextTestSupport { // use navigate to find that the end works as expected Navigate<Processor> nav = getRoute("direct://start").navigate(); List<Processor> node = nav.next(); - node = ((Navigate<Processor>) node.get(0)).next(); - node = ((Navigate<Processor>) node.get(0)).next(); // there should be 4 outputs as the end in the otherwise should // ensure that the transform and last send is not within the choice http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java index 6bd79db..2aae4f1 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java @@ -42,7 +42,7 @@ public class NavigateRouteTest extends ContextTestSupport { Navigate<Processor> nav = context.getRoutes().get(0).navigate(); navigateRoute(nav); - assertEquals("There should be 8 processors to navigate", 8, count); + assertEquals("There should be 6 processors to navigate", 6, count); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java b/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java index 3b51222..2d3000c 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java @@ -74,18 +74,10 @@ public class RandomLoadBalanceJavaDSLBuilderTest extends RandomLoadBalanceTest { return; } - for (Processor child : nav.next()) { - - if (child instanceof RouteContextProcessor) { - child = ((RouteContextProcessor) child).getProcessor(); - } - - if (child instanceof DefaultChannel) { - DefaultChannel channel = (DefaultChannel) child; - ProcessorDefinition<?> def = channel.getProcessorDefinition(); - navigateDefinition(def, sb); - } - + if (nav instanceof DefaultChannel) { + DefaultChannel channel = (DefaultChannel) nav; + ProcessorDefinition<?> def = channel.getProcessorDefinition(); + navigateDefinition(def, sb); } } http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java index e6b4a6c..bf8c997 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java @@ -54,8 +54,6 @@ public class ReduceStacksNeededDuringRoutingTest extends ContextTestSupport { try { throw new IllegalArgumentException("Forced to dump stacktrace"); } catch (Exception e) { - log.error("Dump stacktrace to log", e); - StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); e.printStackTrace(pw); @@ -69,6 +67,7 @@ public class ReduceStacksNeededDuringRoutingTest extends ContextTestSupport { count++; } log.info("There is " + count + " lines in the stacktrace"); + log.error("Dump stacktrace to log", e); } } }) http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java index f80f53f..3b34ad2 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java @@ -35,8 +35,6 @@ public class SplitWithEndTest extends ContextTestSupport { // use navigate to find that the end works as expected Navigate<Processor> nav = context.getRoutes().get(0).navigate(); List<Processor> node = nav.next(); - node = ((Navigate<Processor>) node.get(0)).next(); - node = ((Navigate<Processor>) node.get(0)).next(); // there should be 4 outputs as the end in the otherwise should // ensure that the transform and last send is not within the choice