CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9bfdb66a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9bfdb66a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9bfdb66a Branch: refs/heads/master Commit: 9bfdb66a12926229298f1501a9d0e8310a69610c Parents: 89d33f4 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue May 21 10:14:35 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue May 21 10:51:13 2013 +0200 ---------------------------------------------------------------------- .../camel/management/InstrumentationProcessor.java | 1 - .../org/apache/camel/processor/CatchProcessor.java | 2 +- .../camel/processor/DelegateAsyncProcessor.java | 13 +++++------ .../camel/processor/FatalFallbackErrorHandler.java | 4 +- .../apache/camel/processor/FinallyProcessor.java | 4 +- .../org/apache/camel/processor/LoopProcessor.java | 2 +- .../camel/processor/RouteContextProcessor.java | 2 +- .../RouteInflightRepositoryProcessor.java | 2 +- .../processor/interceptor/DefaultChannel.java | 17 ++++++++++---- .../ReduceStacksNeededDuringRoutingTest.java | 8 +++++++ .../AuditInterceptorAsyncDelegateIssueTest.java | 4 +- 11 files changed, 36 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java index 18e6dd6..a406cf7 100644 --- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; * * @version */ -@Deprecated public class InstrumentationProcessor extends DelegateAsyncProcessor { private static final transient Logger LOG = LoggerFactory.getLogger(InstrumentationProcessor.class); http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java index 20fcb29..48c1c10 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -89,7 +89,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable new Object[]{handled, e.getClass().getName(), e.getMessage()}); } - boolean sync = super.processNext(exchange, new AsyncCallback() { + boolean sync = super.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (!handled) { if (exchange.getException() == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java index a49dff4..38cfa44 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java @@ -87,16 +87,15 @@ public class DelegateAsyncProcessor extends ServiceSupport implements DelegatePr } public boolean process(final Exchange exchange, final AsyncCallback callback) { - return processNext(exchange, callback); + return processor.process(exchange, callback); } + /** + * @deprecated use {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} instead + */ + @Deprecated protected boolean processNext(Exchange exchange, AsyncCallback callback) { - if (processor == null) { - // no processor then we are done - callback.done(true); - return true; - } - return processor.process(exchange, callback); + throw new UnsupportedOperationException("This method is deprecated, use process(Exchange, AsyncCallback) instead"); } public boolean hasNext() { http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java index 929a96b..94ef480 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java @@ -37,9 +37,9 @@ public class FatalFallbackErrorHandler extends DelegateAsyncProcessor implements } @Override - protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { + public boolean process(final Exchange exchange, final AsyncCallback callback) { // support the asynchronous routing engine - boolean sync = super.processNext(exchange, new AsyncCallback() { + boolean sync = super.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (exchange.getException() != null) { // an exception occurred during processing onException http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java index c88c7c9..deb6bcf 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java @@ -38,7 +38,7 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl } @Override - protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { + public boolean process(final Exchange exchange, final AsyncCallback callback) { // clear exception so finally block can be executed final Exception e = exchange.getException(); exchange.setException(null); @@ -51,7 +51,7 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); } - boolean sync = super.processNext(exchange, new AsyncCallback() { + boolean sync = super.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (e == null) { exchange.removeProperty(Exchange.FAILURE_ENDPOINT); http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java index 3c943c9..6db4e72 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -100,7 +100,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable { LOG.debug("LoopProcessor: iteration #{}", index.get()); exchange.setProperty(Exchange.LOOP_INDEX, index.get()); - boolean sync = processNext(exchange, new AsyncCallback() { + boolean sync = super.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the routing slip if (doneSync) { http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java index aa6eff2..195d399 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java @@ -37,7 +37,7 @@ public class RouteContextProcessor extends DelegateAsyncProcessor { } @Override - protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { + public boolean process(final Exchange exchange, final AsyncCallback callback) { // push the current route context final UnitOfWork unitOfWork = exchange.getUnitOfWork(); if (unitOfWork != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java index 42da65c..1385fa9 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java @@ -43,7 +43,7 @@ public class RouteInflightRepositoryProcessor extends DelegateAsyncProcessor { } @Override - protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { + public boolean process(final Exchange exchange, final AsyncCallback callback) { inflightRepository.add(exchange, id); boolean sync = processor.process(exchange, new AsyncCallback() { http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index 41c002d..2347dfd 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -210,11 +210,13 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { internalProcessor.addTask(new CamelInternalProcessor.BacklogTracerTask(backlogTracer.getQueue(), backlogTracer, targetOutputDef, route, first)); } - // TODO: trace interceptor can be a task on internalProcessor - TraceInterceptor trace = (TraceInterceptor) getOrCreateTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null); - // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly - trace.setRouteContext(routeContext); - target = trace; + tracer = getOrCreateTracer(); + if (tracer != null) { + TraceInterceptor trace = (TraceInterceptor) tracer.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null); + // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly + trace.setRouteContext(routeContext); + target = trace; + } // sort interceptors according to ordered Collections.sort(interceptors, new OrderedComparator()); @@ -274,6 +276,11 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { } private InterceptStrategy getOrCreateTracer() { + // only use tracer if explicit enabled + if (camelContext.isTracing() != null && !camelContext.isTracing()) { + return null; + } + InterceptStrategy tracer = Tracer.getTracer(camelContext); if (tracer == null) { if (camelContext.getRegistry() != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java index bf8c997..3267459 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java @@ -31,6 +31,11 @@ import org.apache.camel.component.mock.MockEndpoint; */ public class ReduceStacksNeededDuringRoutingTest extends ContextTestSupport { + @Override + protected boolean useJmx() { + return true; + } + public void testReduceStacksNeeded() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("Hello World"); @@ -45,9 +50,12 @@ public class ReduceStacksNeededDuringRoutingTest extends ContextTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { + // context.setTracing(true); + from("seda:start") .to("log:foo") .to("log:bar") + .to("log:baz") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java b/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java index 536400a..9e4a9fe 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java @@ -90,9 +90,9 @@ public class AuditInterceptorAsyncDelegateIssueTest extends ContextTestSupport { public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception { return new DelegateAsyncProcessor(target) { - protected boolean processNext(Exchange exchange, AsyncCallback callback) { + public boolean process(Exchange exchange, AsyncCallback callback) { invoked = true; - return super.processNext(exchange, callback); + return super.process(exchange, callback); } }; }