CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9abffe3c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9abffe3c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9abffe3c Branch: refs/heads/master Commit: 9abffe3c3f3798956a5cc053f07a793e5a3cab6f Parents: 9478086 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 26 19:09:32 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 26 19:09:32 2013 +0200 ---------------------------------------------------------------------- .../camel/processor/CamelInternalProcessor.java | 32 ++++++++++++- .../processor/interceptor/DefaultChannel.java | 36 +-------------- 2 files changed, 31 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9abffe3c/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index c2e745e..f56a63e 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Queue; +import java.util.concurrent.RejectedExecutionException; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -109,8 +110,8 @@ public final class CamelInternalProcessor extends DelegateAsyncProcessor { // ---------------------------------------------------------- - if (processor == null) { - // no processor then we are done + if (processor == null || !continueProcessing(exchange)) { + // no processor or we should not continue then we are done callback.done(true); return true; } @@ -230,6 +231,33 @@ public final class CamelInternalProcessor extends DelegateAsyncProcessor { } } + /** + * Strategy to determine if we should continue processing the {@link Exchange}. + */ + protected boolean continueProcessing(Exchange exchange) { + Object stop = exchange.getProperty(Exchange.ROUTE_STOP); + if (stop != null) { + boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); + if (doStop) { + LOG.debug("Exchange is marked to stop routing: {}", exchange); + return false; + } + } + + // determine if we can still run, or the camel context is forcing a shutdown + boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this); + if (forceShutdown) { + LOG.debug("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange); + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException()); + } + return false; + } + + // yes we can continue + return true; + } + public static class InstrumentationTask implements CamelInternalProcessorTask<StopWatch> { private PerformanceCounter counter; http://git-wip-us.apache.org/repos/asf/camel/blob/9abffe3c/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index 6d91a02..4634ec6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.RejectedExecutionException; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -386,43 +385,10 @@ public class DefaultChannel extends ServiceSupport implements ModelChannel { } public boolean process(final Exchange exchange, final AsyncCallback callback) { - // TODO: This logic can be in internal processor - if (!continueProcessing(exchange)) { - // we should not continue routing so we are done - callback.done(true); - return true; - } - + // TODO: We do not need to have DefaultChannel wrapped in the routes, but can just rely on CamelInternalProcessor return internalProcessor.process(exchange, callback); } - /** - * Strategy to determine if we should continue processing the {@link Exchange}. - */ - protected boolean continueProcessing(Exchange exchange) { - Object stop = exchange.getProperty(Exchange.ROUTE_STOP); - if (stop != null) { - boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); - if (doStop) { - LOG.debug("Exchange is marked to stop routing: {}", exchange); - return false; - } - } - - // determine if we can still run, or the camel context is forcing a shutdown - boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); - if (forceShutdown) { - LOG.debug("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange); - if (exchange.getException() == null) { - exchange.setException(new RejectedExecutionException()); - } - return false; - } - - // yes we can continue - return true; - } - @Override public String toString() { // just output the next processor as all the interceptors and error handler is just too verbose