Updated Branches: refs/heads/master 13a454b27 -> 58fc1ffbc
CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/58fc1ffb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/58fc1ffb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/58fc1ffb Branch: refs/heads/master Commit: 58fc1ffbc55328e4fbf781810112e6cada861d86 Parents: 13a454b Author: Claus Ibsen <davscl...@apache.org> Authored: Tue May 28 12:05:57 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue May 28 12:38:01 2013 +0200 ---------------------------------------------------------------------- .../org/apache/camel/model/ProcessDefinition.java | 12 ++- .../camel/processor/DelegateAsyncProcessor.java | 3 +- .../camel/processor/DelegateSyncProcessor.java | 103 +++++++++++++++ 3 files changed, 115 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/58fc1ffb/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java index f908297..03c9b2d 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java @@ -22,9 +22,11 @@ import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Processor; import org.apache.camel.Service; -import org.apache.camel.processor.WrapProcessor; +import org.apache.camel.processor.DelegateAsyncProcessor; +import org.apache.camel.processor.DelegateSyncProcessor; import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; @@ -92,7 +94,13 @@ public class ProcessDefinition extends NoOutputDefinition<ProcessDefinition> { // ensure its wrapped in a Service so we can manage it from eg. JMX // (a Processor must be a Service to be enlisted in JMX) if (!(answer instanceof Service)) { - answer = new WrapProcessor(answer, answer); + if (answer instanceof AsyncProcessor) { + // the processor is async by nature so use the async delegate + answer = new DelegateAsyncProcessor(answer); + } else { + // the processor is sync by nature so use the sync delegate + answer = new DelegateSyncProcessor(answer); + } } return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/58fc1ffb/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java index 38cfa44..7411b8a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java @@ -37,7 +37,8 @@ import org.apache.camel.util.ServiceHelper; * <b>Important:</b> This implementation <b>does</b> support the asynchronous routing engine. * If you are implementing a EIP pattern please use this as the delegate. * - * @version + * @version + * @see DelegateSyncProcessor * @see org.apache.camel.processor.DelegateProcessor */ public class DelegateAsyncProcessor extends ServiceSupport implements DelegateProcessor, AsyncProcessor, Navigate<Processor> { http://git-wip-us.apache.org/repos/asf/camel/blob/58fc1ffb/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java new file mode 100644 index 0000000..ceaccff --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.Navigate; +import org.apache.camel.Processor; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ServiceHelper; + +/** + * A Delegate pattern which delegates synchronous processing to a nested {@link org.apache.camel.Processor} which can + * be useful for implementation inheritance when writing an {@link org.apache.camel.spi.Policy} + * <p/> + * <b>Important:</b> This implementation <b>does</b> support the asynchronous routing engine, <b>only</b>. + * if the logic in the {@link #process(org.apache.camel.Exchange)} does not invoke EIPs; as it forces using + * synchronous processing during the {@link #process(org.apache.camel.Exchange)} method call. + * If you are implementing a EIP pattern please use this as the delegate, for simple EIPs. + * + * @version + * @see DelegateAsyncProcessor + * @see org.apache.camel.processor.DelegateProcessor + */ +public class DelegateSyncProcessor extends ServiceSupport implements org.apache.camel.DelegateProcessor, AsyncProcessor, Navigate<Processor> { + protected Processor processor; + + public DelegateSyncProcessor(Processor processor) { + this.processor = processor; + } + + @Override + public String toString() { + return "DelegateSync[" + processor + "]"; + } + + public Processor getProcessor() { + return processor; + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + // force calling the sync method + try { + processor.process(exchange); + } catch (Throwable e) { + // must catch throwable so we catch all + exchange.setException(e); + } finally { + // we are bridging a sync processor as async so callback with true + callback.done(true); + } + return true; + } + + @Override + public void process(Exchange exchange) throws Exception { + processor.process(exchange); + } + + @Override + public boolean hasNext() { + return processor != null; + } + + @Override + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<Processor>(1); + answer.add(processor); + return answer; + } + + @Override + protected void doStart() throws Exception { + ServiceHelper.startService(processor); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopServices(processor); + } +}