Updated Branches:
  refs/heads/master 9406f319d -> ad4f5116f

CAMEL-6377: Optimized routing engine to reduce stack frames in use during 
routing. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ad4f5116
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ad4f5116
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ad4f5116

Branch: refs/heads/master
Commit: ad4f5116f45c26a4a5b2de69d3042342ee4125a9
Parents: 9406f31
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun May 26 12:09:01 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun May 26 12:09:01 2013 +0200

----------------------------------------------------------------------
 .../camel/impl/CamelPostProcessorHelper.java       |    6 ++++--
 .../apache/camel/model/ResequenceDefinition.java   |   15 +++++++++++++--
 .../org/apache/camel/processor/BatchProcessor.java |    3 +--
 3 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ad4f5116/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java 
b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
index 8c5d13e..a8dc64b 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
@@ -35,7 +35,7 @@ import org.apache.camel.Service;
 import org.apache.camel.component.bean.BeanInfo;
 import org.apache.camel.component.bean.BeanProcessor;
 import org.apache.camel.component.bean.ProxyHelper;
-import org.apache.camel.processor.UnitOfWorkProcessor;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.UnitOfWorkProducer;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.IntrospectionSupport;
@@ -129,7 +129,9 @@ public class CamelPostProcessorHelper implements 
CamelContextAware {
         BeanInfo info = new BeanInfo(getCamelContext(), method);
         BeanProcessor answer = new BeanProcessor(pojo, info);
         // must ensure the consumer is being executed in an unit of work so 
synchronization callbacks etc is invoked
-        return new UnitOfWorkProcessor(answer);
+        CamelInternalProcessor internal = new CamelInternalProcessor(answer);
+        internal.addTask(new 
CamelInternalProcessor.UnitOfWorkProcessorTask(null));
+        return internal;
     }
 
     public Endpoint getEndpointInjection(Object bean, String uri, String name, 
String propertyName,

http://git-wip-us.apache.org/repos/asf/camel/blob/ad4f5116/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
index cdece76..c208086 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
@@ -32,6 +32,7 @@ import org.apache.camel.model.config.BatchResequencerConfig;
 import org.apache.camel.model.config.ResequencerConfig;
 import org.apache.camel.model.config.StreamResequencerConfig;
 import org.apache.camel.model.language.ExpressionDefinition;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.Resequencer;
 import org.apache.camel.processor.StreamResequencer;
 import org.apache.camel.processor.resequencer.ExpressionResultComparator;
@@ -345,10 +346,15 @@ public class ResequenceDefinition extends 
ProcessorDefinition<ResequenceDefiniti
         Processor processor = this.createChildProcessor(routeContext, true);
         Expression expression = getExpression().createExpression(routeContext);
 
+        // and wrap in unit of work
+        String routeId = 
routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(processor);
+        internal.addTask(new 
CamelInternalProcessor.UnitOfWorkProcessorTask(routeId));
+
         ObjectHelper.notNull(config, "config", this);
         ObjectHelper.notNull(expression, "expression", this);
 
-        Resequencer resequencer = new 
Resequencer(routeContext.getCamelContext(), processor, expression,
+        Resequencer resequencer = new 
Resequencer(routeContext.getCamelContext(), internal, expression,
                 config.isAllowDuplicates(), config.isReverse());
         resequencer.setBatchSize(config.getBatchSize());
         resequencer.setBatchTimeout(config.getBatchTimeout());
@@ -371,13 +377,18 @@ public class ResequenceDefinition extends 
ProcessorDefinition<ResequenceDefiniti
         Processor processor = this.createChildProcessor(routeContext, true);
         Expression expression = getExpression().createExpression(routeContext);
 
+        // and wrap in unit of work
+        String routeId = 
routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(processor);
+        internal.addTask(new 
CamelInternalProcessor.UnitOfWorkProcessorTask(routeId));
+
         ObjectHelper.notNull(config, "config", this);
         ObjectHelper.notNull(expression, "expression", this);
 
         ExpressionResultComparator comparator = config.getComparator();
         comparator.setExpression(expression);
 
-        StreamResequencer resequencer = new 
StreamResequencer(routeContext.getCamelContext(), processor, comparator);
+        StreamResequencer resequencer = new 
StreamResequencer(routeContext.getCamelContext(), internal, comparator);
         resequencer.setTimeout(config.getTimeout());
         resequencer.setCapacity(config.getCapacity());
         resequencer.setRejectOld(config.getRejectOld());

http://git-wip-us.apache.org/repos/asf/camel/blob/ad4f5116/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
index 1ffe3a9..5bb8b4c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
@@ -81,7 +81,7 @@ public class BatchProcessor extends ServiceSupport implements 
Processor, Navigat
 
         // wrap processor in UnitOfWork so what we send out of the batch runs 
in a UoW
         this.camelContext = camelContext;
-        this.processor = new UnitOfWorkProcessor(processor);
+        this.processor = processor;
         this.collection = collection;
         this.expression = expression;
         this.sender = new BatchSender();
@@ -236,7 +236,6 @@ public class BatchProcessor extends ServiceSupport 
implements Processor, Navigat
 
     protected void doStop() throws Exception {
         sender.cancel();
-        ServiceHelper.stopServices(sender);
         ServiceHelper.stopServices(processor);
         collection.clear();
     }

Reply via email to