Updated Branches:
  refs/heads/master 10cb2ddbf -> 363f892d6

CAMEL-6377: Optimized routing engine to reduce stack frames in use during 
routing. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/363f892d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/363f892d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/363f892d

Branch: refs/heads/master
Commit: 363f892d6b9f3b8afac9bc920c419ea5a1dc2b87
Parents: 10cb2dd
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat May 25 19:45:02 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun May 26 10:15:29 2013 +0200

----------------------------------------------------------------------
 .../camel/management/InstrumentationProcessor.java |    2 +-
 .../apache/camel/model/AggregateDefinition.java    |   11 +++++++----
 .../apache/camel/model/OnCompletionDefinition.java |   14 ++++++++------
 .../org/apache/camel/model/WireTapDefinition.java  |   11 +++++++----
 4 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/363f892d/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
index a406cf7..e6c3352 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
@@ -69,7 +69,7 @@ public class InstrumentationProcessor extends 
DelegateAsyncProcessor {
         // only record time if stats is enabled
         final StopWatch watch = (counter != null && 
counter.isStatisticsEnabled()) ? new StopWatch() : null;
 
-        return super.process(exchange, new AsyncCallback() {
+        return processor.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 try {
                     // record end time

http://git-wip-us.apache.org/repos/asf/camel/blob/363f892d/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index fce58e5..65768f3 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -34,6 +34,7 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.model.language.ExpressionDefinition;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -157,9 +158,11 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     }
 
     protected AggregateProcessor createAggregator(RouteContext routeContext) 
throws Exception {
-        Processor processor = this.createChildProcessor(routeContext, true);
-        // wrap the aggregated route in a unit of work processor
-        processor = new UnitOfWorkProcessor(routeContext, processor);
+        Processor childProcessor = this.createChildProcessor(routeContext, 
true);
+
+        // wrap the aggregate route in a unit of work processor
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(childProcessor);
+        internal.addTask(new 
CamelInternalProcessor.UnitOfWorkProcessorTask(routeContext.getRoute().getId()));
 
         Expression correlation = 
getExpression().createExpression(routeContext);
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
@@ -173,7 +176,7 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
             shutdownThreadPool = true;
         }
 
-        AggregateProcessor answer = new 
AggregateProcessor(routeContext.getCamelContext(), processor,
+        AggregateProcessor answer = new 
AggregateProcessor(routeContext.getCamelContext(), internal,
                 correlation, strategy, threadPool, shutdownThreadPool);
 
         AggregationRepository repository = 
createAggregationRepository(routeContext);

http://git-wip-us.apache.org/repos/asf/camel/blob/363f892d/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index 08f64fa..09ffe21 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -34,8 +33,8 @@ import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.OnCompletionProcessor;
-import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -117,12 +116,15 @@ public class OnCompletionDefinition extends 
ProcessorDefinition<OnCompletionDefi
             throw new IllegalArgumentException("Both onCompleteOnly and 
onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
         }
 
+        String id = routeContext.getRoute().getId();
+
         Processor childProcessor = this.createChildProcessor(routeContext, 
true);
+
         // wrap the on completion route in a unit of work processor
-        childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(childProcessor);
+        internal.addTask(new 
CamelInternalProcessor.UnitOfWorkProcessorTask(id));
 
-        String id = routeContext.getRoute().getId();
-        onCompletions.put(id, childProcessor);
+        onCompletions.put(id, internal);
 
         Predicate when = null;
         if (onWhen != null) {
@@ -135,7 +137,7 @@ public class OnCompletionDefinition extends 
ProcessorDefinition<OnCompletionDefi
 
         // should be false by default
         boolean original = getUseOriginalMessagePolicy() != null ? 
getUseOriginalMessagePolicy() : false;
-        OnCompletionProcessor answer = new 
OnCompletionProcessor(routeContext.getCamelContext(), childProcessor,
+        OnCompletionProcessor answer = new 
OnCompletionProcessor(routeContext.getCamelContext(), internal,
                 threadPool, shutdownThreadPool, isOnCompleteOnly(), 
isOnFailureOnly(), when, original);
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/363f892d/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index cb35864..f3f53cf 100644
--- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -33,7 +33,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.processor.UnitOfWorkProcessor;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.WireTapProcessor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
@@ -89,12 +89,15 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
         // create the producer to send to the wire tapped endpoint
         Endpoint endpoint = resolveEndpoint(routeContext);
         Producer producer = endpoint.createProducer();
+
         // create error handler we need to use for processing the wire tapped
         Processor target = wrapInErrorHandler(routeContext, producer);
-        // and wrap in UoW, which is needed for error handler as well
-        target = new UnitOfWorkProcessor(routeContext, target);
 
-        WireTapProcessor answer = new WireTapProcessor(endpoint, target, 
getPattern(), threadPool, shutdownThreadPool);
+        // and wrap in unit of work
+        CamelInternalProcessor internal = new CamelInternalProcessor(target);
+        internal.addTask(new 
CamelInternalProcessor.UnitOfWorkProcessorTask(routeContext.getRoute().getId()));
+
+        WireTapProcessor answer = new WireTapProcessor(endpoint, internal, 
getPattern(), threadPool, shutdownThreadPool);
         answer.setCopy(isCopy());
         if (newExchangeProcessorRef != null) {
             newExchangeProcessor = 
routeContext.mandatoryLookup(newExchangeProcessorRef, Processor.class);

Reply via email to