Updated Branches: refs/heads/master 10cb2ddbf -> 363f892d6
CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/363f892d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/363f892d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/363f892d Branch: refs/heads/master Commit: 363f892d6b9f3b8afac9bc920c419ea5a1dc2b87 Parents: 10cb2dd Author: Claus Ibsen <davscl...@apache.org> Authored: Sat May 25 19:45:02 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 26 10:15:29 2013 +0200 ---------------------------------------------------------------------- .../camel/management/InstrumentationProcessor.java | 2 +- .../apache/camel/model/AggregateDefinition.java | 11 +++++++---- .../apache/camel/model/OnCompletionDefinition.java | 14 ++++++++------ .../org/apache/camel/model/WireTapDefinition.java | 11 +++++++---- 4 files changed, 23 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/363f892d/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java index a406cf7..e6c3352 100644 --- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java @@ -69,7 +69,7 @@ public class InstrumentationProcessor extends DelegateAsyncProcessor { // only record time if stats is enabled final StopWatch watch = (counter != null && counter.isStatisticsEnabled()) ? new StopWatch() : null; - return super.process(exchange, new AsyncCallback() { + return processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { try { // record end time http://git-wip-us.apache.org/repos/asf/camel/blob/363f892d/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index fce58e5..65768f3 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -34,6 +34,7 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.builder.ExpressionClause; import org.apache.camel.model.language.ExpressionDefinition; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.UnitOfWorkProcessor; import org.apache.camel.processor.aggregate.AggregateProcessor; import org.apache.camel.processor.aggregate.AggregationStrategy; @@ -157,9 +158,11 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception { - Processor processor = this.createChildProcessor(routeContext, true); - // wrap the aggregated route in a unit of work processor - processor = new UnitOfWorkProcessor(routeContext, processor); + Processor childProcessor = this.createChildProcessor(routeContext, true); + + // wrap the aggregate route in a unit of work processor + CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); + internal.addTask(new CamelInternalProcessor.UnitOfWorkProcessorTask(routeContext.getRoute().getId())); Expression correlation = getExpression().createExpression(routeContext); AggregationStrategy strategy = createAggregationStrategy(routeContext); @@ -173,7 +176,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition shutdownThreadPool = true; } - AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, + AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal, correlation, strategy, threadPool, shutdownThreadPool); AggregationRepository repository = createAggregationRepository(routeContext); http://git-wip-us.apache.org/repos/asf/camel/blob/363f892d/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java index 08f64fa..09ffe21 100644 --- a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; - import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; @@ -34,8 +33,8 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Predicate; import org.apache.camel.Processor; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.OnCompletionProcessor; -import org.apache.camel.processor.UnitOfWorkProcessor; import org.apache.camel.spi.RouteContext; /** @@ -117,12 +116,15 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); } + String id = routeContext.getRoute().getId(); + Processor childProcessor = this.createChildProcessor(routeContext, true); + // wrap the on completion route in a unit of work processor - childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor); + CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); + internal.addTask(new CamelInternalProcessor.UnitOfWorkProcessorTask(id)); - String id = routeContext.getRoute().getId(); - onCompletions.put(id, childProcessor); + onCompletions.put(id, internal); Predicate when = null; if (onWhen != null) { @@ -135,7 +137,7 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi // should be false by default boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false; - OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor, + OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal, threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original); return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/363f892d/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java index cb35864..f3f53cf 100644 --- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java @@ -33,7 +33,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.processor.UnitOfWorkProcessor; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.WireTapProcessor; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.CamelContextHelper; @@ -89,12 +89,15 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N // create the producer to send to the wire tapped endpoint Endpoint endpoint = resolveEndpoint(routeContext); Producer producer = endpoint.createProducer(); + // create error handler we need to use for processing the wire tapped Processor target = wrapInErrorHandler(routeContext, producer); - // and wrap in UoW, which is needed for error handler as well - target = new UnitOfWorkProcessor(routeContext, target); - WireTapProcessor answer = new WireTapProcessor(endpoint, target, getPattern(), threadPool, shutdownThreadPool); + // and wrap in unit of work + CamelInternalProcessor internal = new CamelInternalProcessor(target); + internal.addTask(new CamelInternalProcessor.UnitOfWorkProcessorTask(routeContext.getRoute().getId())); + + WireTapProcessor answer = new WireTapProcessor(endpoint, internal, getPattern(), threadPool, shutdownThreadPool); answer.setCopy(isCopy()); if (newExchangeProcessorRef != null) { newExchangeProcessor = routeContext.mandatoryLookup(newExchangeProcessorRef, Processor.class);