CAMEL-6377: Optimized routing engine to reduce stack frames in use during 
routing. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ede1339
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ede1339
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ede1339

Branch: refs/heads/master
Commit: 9ede13394374659a87d313951eef9bdbafd5ff5f
Parents: 7bcf843
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon May 20 18:06:16 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue May 21 07:43:38 2013 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/camel/Exchange.java   |    4 +
 .../org/apache/camel/impl/DefaultRouteContext.java |    8 +-
 .../camel/processor/CamelInternalProcessor.java    |   96 ++++++++++++++-
 .../processor/interceptor/DefaultChannel.java      |    1 +
 .../apache/camel/processor/ChoiceWithEndTest.java  |    2 -
 .../apache/camel/processor/NavigateRouteTest.java  |    2 +-
 .../RandomLoadBalanceJavaDSLBuilderTest.java       |   16 +--
 .../ReduceStacksNeededDuringRoutingTest.java       |    3 +-
 .../apache/camel/processor/SplitWithEndTest.java   |    2 -
 9 files changed, 109 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/main/java/org/apache/camel/Exchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java 
b/camel-core/src/main/java/org/apache/camel/Exchange.java
index 1c2e1bc..5e8a88d 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -193,6 +193,10 @@ public interface Exchange {
     String TRANSFER_ENCODING     = "Transfer-Encoding";
 
     String UNIT_OF_WORK_EXHAUSTED    = "CamelUnitOfWorkExhausted";
+    /**
+     * @deprecated UNIT_OF_WORK_PROCESS_SYNC is not in use and will be removed 
in future Camel release
+     */
+    @Deprecated
     String UNIT_OF_WORK_PROCESS_SYNC = "CamelUnitOfWorkProcessSync";
 
     String XSLT_FILE_NAME = "CamelXsltFileName";

http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
index 0531e6c..07c3235 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
@@ -150,10 +150,13 @@ public class DefaultRouteContext implements RouteContext {
         if (!eventDrivenProcessors.isEmpty()) {
             Processor target = Pipeline.newInstance(getCamelContext(), 
eventDrivenProcessors);
 
+            String routeId = 
route.idOrCreate(getCamelContext().getNodeIdFactory());
+
             // and wrap it in a unit of work so the UoW is on the top, so the 
entire route will be in the same UoW
-            UnitOfWorkProcessor unitOfWorkProcessor = new 
UnitOfWorkProcessor(this, target);
+            //UnitOfWorkProcessor unitOfWorkProcessor = new 
UnitOfWorkProcessor(this, target);
 
-            CamelInternalProcessor internal = new 
CamelInternalProcessor(unitOfWorkProcessor);
+            CamelInternalProcessor internal = new 
CamelInternalProcessor(target);
+            internal.addTask(new 
CamelInternalProcessor.UnitOfWorkProcessorTask(routeId));
 
             // and then optionally add route policy processor if a custom 
policy is set
             List<RoutePolicy> routePolicyList = getRoutePolicyList();
@@ -174,7 +177,6 @@ public class DefaultRouteContext implements RouteContext {
             }
 
             // wrap in route inflight processor to track number of inflight 
exchanges for the route
-            String routeId = 
route.idOrCreate(getCamelContext().getNodeIdFactory());
             internal.addTask(new 
CamelInternalProcessor.RouteInflightRepositoryTask(camelContext.getInflightRepository(),
 routeId));
 
             // TODO: This should be a task as well

http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 5e73115..b04d6b9 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -29,6 +29,8 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.StatefulService;
 import org.apache.camel.api.management.PerformanceCounter;
+import org.apache.camel.impl.DefaultUnitOfWork;
+import org.apache.camel.impl.MDCUnitOfWork;
 import org.apache.camel.management.DelegatePerformanceCounter;
 import org.apache.camel.management.mbean.ManagedPerformanceCounter;
 import org.apache.camel.model.ProcessorDefinition;
@@ -46,10 +48,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Internal {@link Processor} that Camel routing engine used during routing 
for cross cutting functionality such as:
  * <ul>
+ *     <li>Execute {@link UnitOfWork}</li>
  *     <li>Keeping track which route currently is being routed</li>
+ *     <li>Execute {@link RoutePolicy}</li>
  *     <li>Gather JMX performance statics</li>
  *     <li>Tracing</li>
- *     <li>Execute {@link RoutePolicy}</li>
  * </ul>
  * ... and much more.
  * <p/>
@@ -115,9 +118,17 @@ public final class CamelInternalProcessor extends 
DelegateAsyncProcessor {
         // create internal callback which will execute the tasks in reverse 
order when done
         callback = new InternalCallback(states, exchange, callback);
 
-        if (exchange.isTransacted()) {
+        // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it 
from Camel 3.0
+        Object synchronous = 
exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
+        if (exchange.isTransacted() || synchronous != null) {
             // must be synchronized for transacted exchanges
-            LOG.trace("Transacted Exchange must be routed synchronously for 
exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+            if (LOG.isTraceEnabled()) {
+                if (exchange.isTransacted()) {
+                    LOG.trace("Transacted Exchange must be routed 
synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+                } else {
+                    LOG.trace("Synchronous UnitOfWork Exchange must be routed 
synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+                }
+            }
             try {
                 processor.process(exchange);
             } catch (Throwable e) {
@@ -135,6 +146,9 @@ public final class CamelInternalProcessor extends 
DelegateAsyncProcessor {
                 async = uow.beforeProcess(processor, exchange, callback);
             }
 
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchange for exchangeId: {} -> {}", 
exchange.getExchangeId(), exchange);
+            }
             boolean sync = processor.process(exchange, async);
 
             // execute any after processor work (in current thread, not in the 
callback)
@@ -414,4 +428,80 @@ public final class CamelInternalProcessor extends 
DelegateAsyncProcessor {
         }
     }
 
+    public static class UnitOfWorkProcessorTask implements 
CamelInternalProcessorTask<UnitOfWork> {
+
+        private final String routeId;
+
+        public UnitOfWorkProcessorTask(String routeId) {
+            this.routeId = routeId;
+        }
+
+        @Override
+        public UnitOfWork before(Exchange exchange) throws Exception {
+            // if the exchange doesn't have from route id set, then set it if 
it originated
+            // from this unit of work
+            if (routeId != null && exchange.getFromRouteId() == null) {
+                exchange.setFromRouteId(routeId);
+            }
+
+            if (exchange.getUnitOfWork() == null) {
+                // If there is no existing UoW, then we should start one and
+                // terminate it once processing is completed for the exchange.
+                final UnitOfWork uow = createUnitOfWork(exchange);
+                exchange.setUnitOfWork(uow);
+                uow.start();
+                return uow;
+            }
+
+            return null;
+        }
+
+        @Override
+        public void after(Exchange exchange, UnitOfWork uow) throws Exception {
+            if (uow != null) {
+                doneUow(uow, exchange);
+            }
+        }
+
+            /**
+             * Strategy to create the unit of work for the given exchange.
+             *
+             * @param exchange the exchange
+             * @return the created unit of work
+             */
+        protected UnitOfWork createUnitOfWork(Exchange exchange) {
+            UnitOfWork answer;
+            if (exchange.getContext().isUseMDCLogging()) {
+                answer = new MDCUnitOfWork(exchange);
+            } else {
+                answer = new DefaultUnitOfWork(exchange);
+            }
+            return answer;
+        }
+
+        private void doneUow(UnitOfWork uow, Exchange exchange) {
+            // unit of work is done
+            try {
+                if (uow != null) {
+                    uow.done(exchange);
+                }
+            } catch (Throwable e) {
+                LOG.warn("Exception occurred during done UnitOfWork for 
Exchange: " + exchange
+                        + ". This exception will be ignored.", e);
+            }
+            try {
+                if (uow != null) {
+                    uow.stop();
+                }
+            } catch (Throwable e) {
+                LOG.warn("Exception occurred during stopping UnitOfWork for 
Exchange: " + exchange
+                        + ". This exception will be ignored.", e);
+            }
+
+            // remove uow from exchange as its done
+            exchange.setUnitOfWork(null);
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index aecad64..69248f0 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -341,6 +341,7 @@ public class DefaultChannel extends ServiceSupport 
implements ModelChannel {
     }
 
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        // TODO: This logic can be in internal processor
         if (!continueProcessing(exchange)) {
             // we should not continue routing so we are done
             callback.done(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java
index 97c37e2..aba0408 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ChoiceWithEndTest.java
@@ -34,8 +34,6 @@ public class ChoiceWithEndTest extends ContextTestSupport {
         // use navigate to find that the end works as expected
         Navigate<Processor> nav = getRoute("direct://start").navigate();
         List<Processor> node = nav.next();
-        node = ((Navigate<Processor>) node.get(0)).next();
-        node = ((Navigate<Processor>) node.get(0)).next();
 
         // there should be 4 outputs as the end in the otherwise should
         // ensure that the transform and last send is not within the choice

http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
index 6bd79db..2aae4f1 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
@@ -42,7 +42,7 @@ public class NavigateRouteTest extends ContextTestSupport {
         Navigate<Processor> nav = context.getRoutes().get(0).navigate();
         navigateRoute(nav);
 
-        assertEquals("There should be 8 processors to navigate", 8, count);
+        assertEquals("There should be 6 processors to navigate", 6, count);
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
index 3b51222..2d3000c 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
@@ -74,18 +74,10 @@ public class RandomLoadBalanceJavaDSLBuilderTest extends 
RandomLoadBalanceTest {
             return;
         }
 
-        for (Processor child : nav.next()) {
-
-            if (child instanceof RouteContextProcessor) {
-                child = ((RouteContextProcessor) child).getProcessor();
-            }
-
-            if (child instanceof DefaultChannel) {
-                DefaultChannel channel = (DefaultChannel) child;
-                ProcessorDefinition<?> def = channel.getProcessorDefinition();
-                navigateDefinition(def, sb);
-            }
-
+        if (nav instanceof DefaultChannel) {
+            DefaultChannel channel = (DefaultChannel) nav;
+            ProcessorDefinition<?> def = channel.getProcessorDefinition();
+            navigateDefinition(def, sb);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
index e6b4a6c..bf8c997 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/ReduceStacksNeededDuringRoutingTest.java
@@ -54,8 +54,6 @@ public class ReduceStacksNeededDuringRoutingTest extends 
ContextTestSupport {
                                 try {
                                     throw new IllegalArgumentException("Forced 
to dump stacktrace");
                                 } catch (Exception e) {
-                                    log.error("Dump stacktrace to log", e);
-
                                     StringWriter sw = new StringWriter();
                                     PrintWriter pw = new PrintWriter(sw);
                                     e.printStackTrace(pw);
@@ -69,6 +67,7 @@ public class ReduceStacksNeededDuringRoutingTest extends 
ContextTestSupport {
                                         count++;
                                     }
                                     log.info("There is " + count + " lines in 
the stacktrace");
+                                    log.error("Dump stacktrace to log", e);
                                 }
                             }
                         })

http://git-wip-us.apache.org/repos/asf/camel/blob/9ede1339/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java
index f80f53f..3b34ad2 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitWithEndTest.java
@@ -35,8 +35,6 @@ public class SplitWithEndTest extends ContextTestSupport {
         // use navigate to find that the end works as expected
         Navigate<Processor> nav = context.getRoutes().get(0).navigate();
         List<Processor> node = nav.next();
-        node = ((Navigate<Processor>) node.get(0)).next();
-        node = ((Navigate<Processor>) node.get(0)).next();
 
         // there should be 4 outputs as the end in the otherwise should
         // ensure that the transform and last send is not within the choice

Reply via email to