CAMEL-6377: Optimized routing engine to reduce stack frames in use during 
routing. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9bfdb66a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9bfdb66a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9bfdb66a

Branch: refs/heads/master
Commit: 9bfdb66a12926229298f1501a9d0e8310a69610c
Parents: 89d33f4
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue May 21 10:14:35 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue May 21 10:51:13 2013 +0200

----------------------------------------------------------------------
 .../camel/management/InstrumentationProcessor.java |    1 -
 .../org/apache/camel/processor/CatchProcessor.java |    2 +-
 .../camel/processor/DelegateAsyncProcessor.java    |   13 +++++------
 .../camel/processor/FatalFallbackErrorHandler.java |    4 +-
 .../apache/camel/processor/FinallyProcessor.java   |    4 +-
 .../org/apache/camel/processor/LoopProcessor.java  |    2 +-
 .../camel/processor/RouteContextProcessor.java     |    2 +-
 .../RouteInflightRepositoryProcessor.java          |    2 +-
 .../processor/interceptor/DefaultChannel.java      |   17 ++++++++++----
 .../ReduceStacksNeededDuringRoutingTest.java       |    8 +++++++
 .../AuditInterceptorAsyncDelegateIssueTest.java    |    4 +-
 11 files changed, 36 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
index 18e6dd6..a406cf7 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
-@Deprecated
 public class InstrumentationProcessor extends DelegateAsyncProcessor {
 
     private static final transient Logger LOG = 
LoggerFactory.getLogger(InstrumentationProcessor.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
index 20fcb29..48c1c10 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -89,7 +89,7 @@ public class CatchProcessor extends DelegateAsyncProcessor 
implements Traceable
                     new Object[]{handled, e.getClass().getName(), 
e.getMessage()});
         }
 
-        boolean sync = super.processNext(exchange, new AsyncCallback() {
+        boolean sync = super.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 if (!handled) {
                     if (exchange.getException() == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
index a49dff4..38cfa44 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
@@ -87,16 +87,15 @@ public class DelegateAsyncProcessor extends ServiceSupport 
implements DelegatePr
     }
 
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
-        return processNext(exchange, callback);
+        return processor.process(exchange, callback);
     }
 
+    /**
+     * @deprecated use {@link #process(org.apache.camel.Exchange, 
org.apache.camel.AsyncCallback)} instead
+     */
+    @Deprecated
     protected boolean processNext(Exchange exchange, AsyncCallback callback) {
-        if (processor == null) {
-            // no processor then we are done
-            callback.done(true);
-            return true;
-        }
-        return processor.process(exchange, callback);
+        throw new UnsupportedOperationException("This method is deprecated, 
use process(Exchange, AsyncCallback) instead");
     }
 
     public boolean hasNext() {

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java
 
b/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java
index 929a96b..94ef480 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java
@@ -37,9 +37,9 @@ public class FatalFallbackErrorHandler extends 
DelegateAsyncProcessor implements
     }
 
     @Override
-    protected boolean processNext(final Exchange exchange, final AsyncCallback 
callback) {
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         // support the asynchronous routing engine
-        boolean sync = super.processNext(exchange, new AsyncCallback() {
+        boolean sync = super.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 if (exchange.getException() != null) {
                     // an exception occurred during processing onException

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index c88c7c9..deb6bcf 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -38,7 +38,7 @@ public class FinallyProcessor extends DelegateAsyncProcessor 
implements Traceabl
     }
 
     @Override
-    protected boolean processNext(final Exchange exchange, final AsyncCallback 
callback) {
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         // clear exception so finally block can be executed
         final Exception e = exchange.getException();
         exchange.setException(null);
@@ -51,7 +51,7 @@ public class FinallyProcessor extends DelegateAsyncProcessor 
implements Traceabl
             exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
         }
 
-        boolean sync = super.processNext(exchange, new AsyncCallback() {
+        boolean sync = super.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 if (e == null) {
                     exchange.removeProperty(Exchange.FAILURE_ENDPOINT);

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 3c943c9..6db4e72 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -100,7 +100,7 @@ public class LoopProcessor extends DelegateAsyncProcessor 
implements Traceable {
         LOG.debug("LoopProcessor: iteration #{}", index.get());
         exchange.setProperty(Exchange.LOOP_INDEX, index.get());
 
-        boolean sync = processNext(exchange, new AsyncCallback() {
+        boolean sync = super.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 // we only have to handle async completion of the routing slip
                 if (doneSync) {

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
index aa6eff2..195d399 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java
@@ -37,7 +37,7 @@ public class RouteContextProcessor extends 
DelegateAsyncProcessor {
     }
 
     @Override
-    protected boolean processNext(final Exchange exchange, final AsyncCallback 
callback) {
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         // push the current route context
         final UnitOfWork unitOfWork = exchange.getUnitOfWork();
         if (unitOfWork != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
index 42da65c..1385fa9 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
@@ -43,7 +43,7 @@ public class RouteInflightRepositoryProcessor extends 
DelegateAsyncProcessor {
     }
 
     @Override
-    protected boolean processNext(final Exchange exchange, final AsyncCallback 
callback) {
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         inflightRepository.add(exchange, id);
         
         boolean sync = processor.process(exchange, new AsyncCallback() {

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index 41c002d..2347dfd 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -210,11 +210,13 @@ public class DefaultChannel extends ServiceSupport 
implements ModelChannel {
             internalProcessor.addTask(new 
CamelInternalProcessor.BacklogTracerTask(backlogTracer.getQueue(), 
backlogTracer, targetOutputDef, route, first));
         }
 
-        // TODO: trace interceptor can be a task on internalProcessor
-        TraceInterceptor trace = (TraceInterceptor) 
getOrCreateTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), 
targetOutputDef, target, null);
-        // trace interceptor need to have a reference to route context so we 
at runtime can enable/disable tracing on-the-fly
-        trace.setRouteContext(routeContext);
-        target = trace;
+        tracer = getOrCreateTracer();
+        if (tracer != null) {
+            TraceInterceptor trace = (TraceInterceptor) 
tracer.wrapProcessorInInterceptors(routeContext.getCamelContext(), 
targetOutputDef, target, null);
+            // trace interceptor need to have a reference to route context so 
we at runtime can enable/disable tracing on-the-fly
+            trace.setRouteContext(routeContext);
+            target = trace;
+        }
 
         // sort interceptors according to ordered
         Collections.sort(interceptors, new OrderedComparator());
@@ -274,6 +276,11 @@ public class DefaultChannel extends ServiceSupport 
implements ModelChannel {
     }
 
     private InterceptStrategy getOrCreateTracer() {
+        // only use tracer if explicit enabled
+        if (camelContext.isTracing() != null && !camelContext.isTracing()) {
+            return null;
+        }
+
         InterceptStrategy tracer = Tracer.getTracer(camelContext);
         if (tracer == null) {
             if (camelContext.getRegistry() != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
index bf8c997..3267459 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
@@ -31,6 +31,11 @@ import org.apache.camel.component.mock.MockEndpoint;
  */
 public class ReduceStacksNeededDuringRoutingTest extends ContextTestSupport {
 
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
     public void testReduceStacksNeeded() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World");
@@ -45,9 +50,12 @@ public class ReduceStacksNeededDuringRoutingTest extends 
ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                // context.setTracing(true);
+
                 from("seda:start")
                         .to("log:foo")
                         .to("log:bar")
+                        .to("log:baz")
                         .process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfdb66a/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java
index 536400a..9e4a9fe 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/interceptor/AuditInterceptorAsyncDelegateIssueTest.java
@@ -90,9 +90,9 @@ public class AuditInterceptorAsyncDelegateIssueTest extends 
ContextTestSupport {
 
         public Processor wrapProcessorInInterceptors(CamelContext context, 
ProcessorDefinition<?> definition, Processor target, Processor nextTarget) 
throws Exception {
             return new DelegateAsyncProcessor(target) {
-                protected boolean processNext(Exchange exchange, AsyncCallback 
callback) {
+                public boolean process(Exchange exchange, AsyncCallback 
callback) {
                     invoked = true;
-                    return super.processNext(exchange, callback);
+                    return super.process(exchange, callback);
                 }
             };
         }

Reply via email to