CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4f67732b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4f67732b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4f67732b Branch: refs/heads/master Commit: 4f67732b6302a9675c44b3befbcc6e3a1567d4d6 Parents: 89a575a Author: Claus Ibsen <davscl...@apache.org> Authored: Tue May 28 10:44:25 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue May 28 10:44:25 2013 +0200 ---------------------------------------------------------------------- .../camel/impl/EventDrivenConsumerRoute.java | 19 +++--- .../camel/processor/CamelInternalProcessor.java | 2 +- .../processor/interceptor/DefaultChannel.java | 53 ++++++--------- .../java/org/apache/camel/util/ServiceHelper.java | 16 +++-- .../management/ManagedRouteDumpStatsAsXmlTest.java | 4 +- .../apache/camel/processor/ChoiceWithEndTest.java | 1 - .../apache/camel/processor/SplitWithEndTest.java | 1 - 7 files changed, 46 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4f67732b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java index b733798..c9e2709 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java @@ -76,17 +76,16 @@ public class EventDrivenConsumerRoute extends DefaultRoute { public Navigate<Processor> navigate() { Processor answer = getProcessor(); - // we do not want to navigate the instrument and inflight processors - // which is the first 2 delegate async processors, so skip them - // skip the instrumentation processor if this route was wrapped by one - if (answer instanceof DelegateAsyncProcessor) { - answer = ((DelegateAsyncProcessor) answer).getProcessor(); - if (answer instanceof DelegateAsyncProcessor) { - answer = ((DelegateAsyncProcessor) answer).getProcessor(); - } - } - + // we want navigating routes to be easy, so skip the initial channel + // and navigate to its output where it all starts from end user point of view if (answer instanceof Navigate) { + Navigate<Processor> nav = (Navigate<Processor>) answer; + if (nav.next().size() == 1) { + Object first = nav.next().get(0); + if (first instanceof Navigate) { + return (Navigate<Processor>) first; + } + } return (Navigate<Processor>) answer; } return null; http://git-wip-us.apache.org/repos/asf/camel/blob/4f67732b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 3aa0f91..d2eff83 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory; * read the source code of this class about the debugging tips, which you can find in the * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method. */ -public final class CamelInternalProcessor extends DelegateAsyncProcessor { +public class CamelInternalProcessor extends DelegateAsyncProcessor { private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class); private final List<CamelInternalProcessorTask> tasks = new ArrayList<CamelInternalProcessorTask>(); http://git-wip-us.apache.org/repos/asf/camel/blob/4f67732b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index a6282fb..fa6656b 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -40,8 +39,6 @@ import org.apache.camel.processor.WrapProcessor; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.OrderedComparator; import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; @@ -59,7 +56,7 @@ import org.slf4j.LoggerFactory; * * @version */ -public class DefaultChannel extends ServiceSupport implements ModelChannel { +public class DefaultChannel extends CamelInternalProcessor implements ModelChannel { private static final transient Logger LOG = LoggerFactory.getLogger(DefaultChannel.class); @@ -73,17 +70,6 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { private ProcessorDefinition<?> childDefinition; private CamelContext camelContext; private RouteContext routeContext; - private CamelInternalProcessor internalProcessor; - - public List<Processor> next() { - List<Processor> answer = new ArrayList<Processor>(1); - answer.add(nextProcessor); - return answer; - } - - public boolean hasNext() { - return nextProcessor != null; - } public void setNextProcessor(Processor next) { this.nextProcessor = next; @@ -98,6 +84,21 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { return errorHandler != null ? errorHandler : output; } + @Override + public boolean hasNext() { + return nextProcessor != null; + } + + @Override + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<Processor>(1); + answer.add(nextProcessor); + return answer; + } + public void setOutput(Processor output) { this.output = output; } @@ -149,21 +150,20 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { @Override protected void doStart() throws Exception { - // the output has now been created, so assign the output to the internal processor - internalProcessor.setProcessor(getOutput()); - ServiceHelper.startServices(errorHandler, output, internalProcessor); + // the output has now been created, so assign the output as the processor + setProcessor(getOutput()); + ServiceHelper.startServices(errorHandler, output); } @Override protected void doStop() throws Exception { - ServiceHelper.stopServices(output, errorHandler, internalProcessor); + ServiceHelper.stopServices(output, errorHandler); } public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { this.routeContext = routeContext; this.definition = outputDefinition; this.camelContext = routeContext.getCamelContext(); - this.internalProcessor = new CamelInternalProcessor(); Processor target = nextProcessor; Processor next; @@ -210,13 +210,13 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { first = route.getOutputs().get(0) == definition; } - internalProcessor.addTask(new CamelInternalProcessor.BacklogTracerTask(backlogTracer.getQueue(), backlogTracer, targetOutputDef, route, first)); + addTask(new CamelInternalProcessor.BacklogTracerTask(backlogTracer.getQueue(), backlogTracer, targetOutputDef, route, first)); // add debugger as well so we have both tracing and debugging out of the box InterceptStrategy debugger = getOrCreateBacklogDebugger(); if (debugger instanceof BacklogDebugger) { BacklogDebugger backlogDebugger = (BacklogDebugger) debugger; - internalProcessor.addTask(new CamelInternalProcessor.BacklogDebuggerTask(backlogDebugger, target, targetOutputDef)); + addTask(new CamelInternalProcessor.BacklogDebuggerTask(backlogDebugger, target, targetOutputDef)); } } @@ -380,15 +380,6 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { return debugger; } - public void process(Exchange exchange) throws Exception { - AsyncProcessorHelper.process(this, exchange); - } - - public boolean process(final Exchange exchange, final AsyncCallback callback) { - // TODO: We do not need to have DefaultChannel wrapped in the routes, but can just rely on CamelInternalProcessor - return internalProcessor.process(exchange, callback); - } - @Override public String toString() { // just output the next processor as all the interceptors and error handler is just too verbose http://git-wip-us.apache.org/repos/asf/camel/blob/4f67732b/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java b/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java index 1c6877b..d5f3b39 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java @@ -391,11 +391,17 @@ public final class ServiceHelper { if (nav.hasNext()) { List<?> children = nav.next(); for (Object child : children) { - // special for error handler as they are tied to the Channel - if (child instanceof Channel && includeErrorHandler) { - Processor errorHandler = ((Channel) child).getErrorHandler(); - if (errorHandler != null && errorHandler instanceof Service) { - services.add((Service) errorHandler); + if (child instanceof Channel) { + if (includeErrorHandler) { + // special for error handler as they are tied to the Channel + Processor errorHandler = ((Channel) child).getErrorHandler(); + if (errorHandler != null && errorHandler instanceof Service) { + services.add((Service) errorHandler); + } + } + Processor next = ((Channel) child).getNextProcessor(); + if (next != null && next instanceof Service) { + services.add((Service) next); } } if (child instanceof Service) { http://git-wip-us.apache.org/repos/asf/camel/blob/4f67732b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpStatsAsXmlTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpStatsAsXmlTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpStatsAsXmlTest.java index edb8e8d..12f9c74 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpStatsAsXmlTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpStatsAsXmlTest.java @@ -55,7 +55,9 @@ public class ManagedRouteDumpStatsAsXmlTest extends ManagementTestSupport { @Override public void configure() throws Exception { from("direct:start").routeId("foo") - .to("log:foo").delay(100).to("mock:result"); + .to("log:foo").id("to-log") + .delay(100) + .to("mock:result").id("to-mock"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/4f67732b/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java b/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java index aba0408..ae7ace9 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java @@ -55,7 +55,6 @@ public class ChoiceWithEndTest extends ContextTestSupport { } } return answer; - } public void testChoiceHello() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/4f67732b/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java index 3b34ad2..b4b4792 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java @@ -56,7 +56,6 @@ public class SplitWithEndTest extends ContextTestSupport { assertMockEndpointsSatisfied(); } - @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() {