Updated Branches: refs/heads/master 9406f319d -> ad4f5116f
CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ad4f5116 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ad4f5116 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ad4f5116 Branch: refs/heads/master Commit: ad4f5116f45c26a4a5b2de69d3042342ee4125a9 Parents: 9406f31 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 26 12:09:01 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 26 12:09:01 2013 +0200 ---------------------------------------------------------------------- .../camel/impl/CamelPostProcessorHelper.java | 6 ++++-- .../apache/camel/model/ResequenceDefinition.java | 15 +++++++++++++-- .../org/apache/camel/processor/BatchProcessor.java | 3 +-- 3 files changed, 18 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ad4f5116/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java index 8c5d13e..a8dc64b 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java +++ b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java @@ -35,7 +35,7 @@ import org.apache.camel.Service; import org.apache.camel.component.bean.BeanInfo; import org.apache.camel.component.bean.BeanProcessor; import org.apache.camel.component.bean.ProxyHelper; -import org.apache.camel.processor.UnitOfWorkProcessor; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.UnitOfWorkProducer; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.IntrospectionSupport; @@ -129,7 +129,9 @@ public class CamelPostProcessorHelper implements CamelContextAware { BeanInfo info = new BeanInfo(getCamelContext(), method); BeanProcessor answer = new BeanProcessor(pojo, info); // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked - return new UnitOfWorkProcessor(answer); + CamelInternalProcessor internal = new CamelInternalProcessor(answer); + internal.addTask(new CamelInternalProcessor.UnitOfWorkProcessorTask(null)); + return internal; } public Endpoint getEndpointInjection(Object bean, String uri, String name, String propertyName, http://git-wip-us.apache.org/repos/asf/camel/blob/ad4f5116/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java index cdece76..c208086 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java @@ -32,6 +32,7 @@ import org.apache.camel.model.config.BatchResequencerConfig; import org.apache.camel.model.config.ResequencerConfig; import org.apache.camel.model.config.StreamResequencerConfig; import org.apache.camel.model.language.ExpressionDefinition; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.Resequencer; import org.apache.camel.processor.StreamResequencer; import org.apache.camel.processor.resequencer.ExpressionResultComparator; @@ -345,10 +346,15 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti Processor processor = this.createChildProcessor(routeContext, true); Expression expression = getExpression().createExpression(routeContext); + // and wrap in unit of work + String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); + CamelInternalProcessor internal = new CamelInternalProcessor(processor); + internal.addTask(new CamelInternalProcessor.UnitOfWorkProcessorTask(routeId)); + ObjectHelper.notNull(config, "config", this); ObjectHelper.notNull(expression, "expression", this); - Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, expression, + Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), internal, expression, config.isAllowDuplicates(), config.isReverse()); resequencer.setBatchSize(config.getBatchSize()); resequencer.setBatchTimeout(config.getBatchTimeout()); @@ -371,13 +377,18 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti Processor processor = this.createChildProcessor(routeContext, true); Expression expression = getExpression().createExpression(routeContext); + // and wrap in unit of work + String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); + CamelInternalProcessor internal = new CamelInternalProcessor(processor); + internal.addTask(new CamelInternalProcessor.UnitOfWorkProcessorTask(routeId)); + ObjectHelper.notNull(config, "config", this); ObjectHelper.notNull(expression, "expression", this); ExpressionResultComparator comparator = config.getComparator(); comparator.setExpression(expression); - StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, comparator); + StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator); resequencer.setTimeout(config.getTimeout()); resequencer.setCapacity(config.getCapacity()); resequencer.setRejectOld(config.getRejectOld()); http://git-wip-us.apache.org/repos/asf/camel/blob/ad4f5116/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java index 1ffe3a9..5bb8b4c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java @@ -81,7 +81,7 @@ public class BatchProcessor extends ServiceSupport implements Processor, Navigat // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW this.camelContext = camelContext; - this.processor = new UnitOfWorkProcessor(processor); + this.processor = processor; this.collection = collection; this.expression = expression; this.sender = new BatchSender(); @@ -236,7 +236,6 @@ public class BatchProcessor extends ServiceSupport implements Processor, Navigat protected void doStop() throws Exception { sender.cancel(); - ServiceHelper.stopServices(sender); ServiceHelper.stopServices(processor); collection.clear(); }