Updated Branches: refs/heads/master 4653c9531 -> 8c5d71b3e
CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c5d71b3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c5d71b3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c5d71b3 Branch: refs/heads/master Commit: 8c5d71b3e5b616c387d561414bad72f3549ee60a Parents: 4653c95 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 26 16:51:49 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 26 17:21:49 2013 +0200 ---------------------------------------------------------------------- .../component/directvm/DirectVmProcessor.java | 2 +- .../org/apache/camel/processor/CatchProcessor.java | 2 +- .../camel/processor/DelayProcessorSupport.java | 6 +- .../camel/processor/FatalFallbackErrorHandler.java | 2 +- .../apache/camel/processor/FilterProcessor.java | 2 +- .../apache/camel/processor/FinallyProcessor.java | 2 +- .../org/apache/camel/processor/LoopProcessor.java | 2 +- .../camel/processor/RouteContextProcessor.java | 70 -------- .../RouteInflightRepositoryProcessor.java | 65 ------- .../camel/processor/RoutePolicyProcessor.java | 133 --------------- .../apache/camel/processor/SamplingThrottler.java | 4 +- .../interceptor/HandleFaultInterceptor.java | 2 +- .../interceptor/StreamCachingInterceptor.java | 2 +- .../processor/interceptor/TraceInterceptor.java | 2 +- .../camel/management/EndpointCompletionTest.java | 15 +- 15 files changed, 24 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java index 5cec883..cb50f69 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java @@ -52,7 +52,7 @@ public final class DirectVmProcessor extends DelegateAsyncProcessor { Thread.currentThread().setContextClassLoader(appClassLoader); changed = true; } - return getProcessor().process(copy, new AsyncCallback() { + return processor.process(copy, new AsyncCallback() { @Override public void done(boolean done) { try { http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java index 48c1c10..1e06330 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -89,7 +89,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable new Object[]{handled, e.getClass().getName(), e.getMessage()}); } - boolean sync = super.process(exchange, new AsyncCallback() { + boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (!handled) { if (exchange.getException() == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java index 8a3f4b7..a5729b3 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java @@ -62,7 +62,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { } // process the exchange now that we woke up - DelayProcessorSupport.super.process(exchange, new AsyncCallback() { + DelayProcessorSupport.this.processor.process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { log.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId()); @@ -114,7 +114,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { try { delay(delay, exchange); // then continue routing - return super.process(exchange, callback); + return processor.process(exchange, callback); } catch (Exception e) { // exception occurred so we are done exchange.setException(e); @@ -143,7 +143,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { exchange.setException(ie); } // then continue routing - return super.process(exchange, callback); + return processor.process(exchange, callback); } } else { exchange.setException(e); http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java index 94ef480..b9feb8b 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/FatalFallbackErrorHandler.java @@ -39,7 +39,7 @@ public class FatalFallbackErrorHandler extends DelegateAsyncProcessor implements @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { // support the asynchronous routing engine - boolean sync = super.process(exchange, new AsyncCallback() { + boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (exchange.getException() != null) { // an exception occurred during processing onException http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java index 4efe06a..aebc12d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java @@ -55,7 +55,7 @@ public class FilterProcessor extends DelegateAsyncProcessor implements Traceable exchange.setProperty(Exchange.FILTER_MATCHED, matches); if (matches) { - return super.process(exchange, callback); + return processor.process(exchange, callback); } else { callback.done(true); return true; http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java index deb6bcf..f75b164 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java @@ -51,7 +51,7 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); } - boolean sync = super.process(exchange, new AsyncCallback() { + boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (e == null) { exchange.removeProperty(Exchange.FAILURE_ENDPOINT); http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java index 6db4e72..df2baed 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -100,7 +100,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable { LOG.debug("LoopProcessor: iteration #{}", index.get()); exchange.setProperty(Exchange.LOOP_INDEX, index.get()); - boolean sync = super.process(exchange, new AsyncCallback() { + boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the routing slip if (doneSync) { http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java deleted file mode 100644 index 195d399..0000000 --- a/camel-core/src/main/java/org/apache/camel/processor/RouteContextProcessor.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.UnitOfWork; - -/** - * This processor tracks the current {@link RouteContext} while processing the {@link Exchange}. - * This ensures that the {@link Exchange} have details under which route its being currently processed. - */ -@Deprecated -public class RouteContextProcessor extends DelegateAsyncProcessor { - - private final RouteContext routeContext; - - public RouteContextProcessor(RouteContext routeContext, Processor processor) { - super(processor); - this.routeContext = routeContext; - } - - @Override - public boolean process(final Exchange exchange, final AsyncCallback callback) { - // push the current route context - final UnitOfWork unitOfWork = exchange.getUnitOfWork(); - if (unitOfWork != null) { - unitOfWork.pushRouteContext(routeContext); - } - - boolean sync = processor.process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - try { - // pop the route context we just used - if (unitOfWork != null) { - unitOfWork.popRouteContext(); - } - } catch (Exception e) { - exchange.setException(e); - } finally { - callback.done(doneSync); - } - } - }); - - return sync; - } - - @Override - public String toString() { - return "RouteContextProcessor[" + processor + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java deleted file mode 100644 index 1385fa9..0000000 --- a/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.Route; -import org.apache.camel.spi.InflightRepository; -import org.apache.camel.spi.RouteContext; - -/** - * This processor tracks the current {@link RouteContext} while processing the {@link Exchange}. - * This ensures that the {@link Exchange} have details under which route its being currently processed. - */ -@Deprecated -public class RouteInflightRepositoryProcessor extends DelegateAsyncProcessor { - - private final InflightRepository inflightRepository; - private String id; - - public RouteInflightRepositoryProcessor(InflightRepository inflightRepository, Processor processor) { - super(processor); - this.inflightRepository = inflightRepository; - } - - public void setRoute(Route route) { - this.id = route.getId(); - } - - @Override - public boolean process(final Exchange exchange, final AsyncCallback callback) { - inflightRepository.add(exchange, id); - - boolean sync = processor.process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - try { - inflightRepository.remove(exchange, id); - } finally { - callback.done(doneSync); - } - } - }); - return sync; - } - - @Override - public String toString() { - return super.toString(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java deleted file mode 100644 index 37d7ffb..0000000 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutePolicyProcessor.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor; - -import java.util.List; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.Route; -import org.apache.camel.StatefulService; -import org.apache.camel.spi.RoutePolicy; -import org.apache.camel.support.SynchronizationAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link Processor} which instruments the {@link RoutePolicy}. - * - * @version - */ -@Deprecated -public class RoutePolicyProcessor extends DelegateAsyncProcessor { - - private static final Logger LOG = LoggerFactory.getLogger(RoutePolicyProcessor.class); - private final List<RoutePolicy> routePolicies; - private Route route; - - public RoutePolicyProcessor(Processor processor, List<RoutePolicy> routePolicies) { - super(processor); - this.routePolicies = routePolicies; - } - - @Override - public String toString() { - return "RoutePolicy[" + routePolicies + "]"; - } - - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - - // invoke begin - for (RoutePolicy policy : routePolicies) { - try { - if (isRoutePolicyRunAllowed(policy)) { - policy.onExchangeBegin(route, exchange); - } - } catch (Exception e) { - LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy - + ". This exception will be ignored", e); - } - } - - // add on completion that invokes the policy callback on complete - // as the Exchange can be routed async and thus we need the callback to - // invoke when the route is completed - exchange.addOnCompletion(new SynchronizationAdapter() { - @Override - public void onDone(Exchange exchange) { - // do not invoke it if Camel is stopping as we don't want - // the policy to start a consumer during Camel is stopping - if (isCamelStopping(exchange.getContext())) { - return; - } - - for (RoutePolicy policy : routePolicies) { - try { - if (isRoutePolicyRunAllowed(policy)) { - policy.onExchangeDone(route, exchange); - } - } catch (Exception e) { - LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy - + ". This exception will be ignored", e); - } - } - } - - @Override - public String toString() { - return "RoutePolicyOnCompletion"; - } - }); - - return super.process(exchange, callback); - } - - /** - * Sets the route this policy applies. - * - * @param route the route - */ - public void setRoute(Route route) { - this.route = route; - } - - /** - * Strategy to determine if this policy is allowed to run - * - * @param policy the policy - * @return <tt>true</tt> to run - */ - protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) { - if (policy instanceof StatefulService) { - StatefulService ss = (StatefulService) policy; - return ss.isRunAllowed(); - } - return true; - } - - private static boolean isCamelStopping(CamelContext context) { - if (context instanceof StatefulService) { - StatefulService ss = (StatefulService) context; - return ss.isStopping() || ss.isStopped(); - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java index 5896181..055890e 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java @@ -48,7 +48,7 @@ public class SamplingThrottler extends DelegateAsyncProcessor { private TimeUnit units; private long timeOfLastExchange; private StopProcessor stopper = new StopProcessor(); - private Object calculationLock = new Object(); + private final Object calculationLock = new Object(); private SampleStats sampled = new SampleStats(); public SamplingThrottler(Processor processor, long messageFrequency) { @@ -120,7 +120,7 @@ public class SamplingThrottler extends DelegateAsyncProcessor { if (doSend) { // continue routing - return super.process(exchange, callback); + return processor.process(exchange, callback); } else { // okay to invoke this synchronously as the stopper // will just set a property http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java index 1ae81de..cc6bbe5 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java @@ -38,7 +38,7 @@ public class HandleFaultInterceptor extends DelegateAsyncProcessor { @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { - return getProcessor().process(exchange, new AsyncCallback() { + return processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { try { // handle fault after we are done http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java index bee8418..8eafb13 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java @@ -49,7 +49,7 @@ public class StreamCachingInterceptor extends DelegateAsyncProcessor { } MessageHelper.resetStreamCache(exchange.getIn()); - return getProcessor().process(exchange, callback); + return processor.process(exchange, callback); } } http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java index d4ab98b..263d39f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java @@ -160,7 +160,7 @@ public class TraceInterceptor extends DelegateAsyncProcessor implements Exchange } // process the exchange - sync = super.process(exchange, new AsyncCallback() { + sync = processor.process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { try { http://git-wip-us.apache.org/repos/asf/camel/blob/8c5d71b3/camel-core/src/test/java/org/apache/camel/management/EndpointCompletionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/EndpointCompletionTest.java b/camel-core/src/test/java/org/apache/camel/management/EndpointCompletionTest.java index 6a37c5e..4cf9b1a 100644 --- a/camel-core/src/test/java/org/apache/camel/management/EndpointCompletionTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/EndpointCompletionTest.java @@ -28,11 +28,6 @@ import org.slf4j.LoggerFactory; public class EndpointCompletionTest extends ManagementTestSupport { private static final transient Logger LOG = LoggerFactory.getLogger(EndpointCompletionTest.class); - @Override - public boolean isUseRouteBuilder() { - return false; - } - @SuppressWarnings("unchecked") public void testEndpointCompletion() throws Exception { MBeanServer mbeanServer = getMBeanServer(); @@ -85,4 +80,14 @@ public class EndpointCompletionTest extends ManagementTestSupport { return answer; } + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // noop + } + }; + } + }