Repository: camel Updated Branches: refs/heads/master e007a6c2c -> e37cf3fde
camel-connector before/after should support asy... ...nc routing engine via pipeline which also is simpler code Modifies the `DefaultConnectorEndpoint` to create `Pipeline` based `Consumer`s and `Producer`s. Additional helper method was added to `Pipeline` to create it for such use cases. Cleans up `Pipeline` by adding `@Override` and removes interfaces that are already inherited from `MulticastProcessor`. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e37cf3fd Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e37cf3fd Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e37cf3fd Branch: refs/heads/master Commit: e37cf3fde55946973dd121cfb48ce7ae4806adc2 Parents: e007a6c Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Oct 6 10:14:28 2017 +0200 Committer: Zoran Regvart <zregv...@apache.org> Committed: Fri Oct 6 11:58:21 2017 +0200 ---------------------------------------------------------------------- .../org/apache/camel/processor/Pipeline.java | 33 +++++-- .../connector/ConnectorConsumerProcessor.java | 94 -------------------- .../component/connector/ConnectorProducer.java | 50 +++-------- .../connector/DefaultConnectorEndpoint.java | 25 ++++-- 4 files changed, 59 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e37cf3fd/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java index b35252d..e75c20c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java @@ -16,6 +16,7 @@ */ package org.apache.camel.processor; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -25,8 +26,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.Traceable; -import org.apache.camel.spi.IdAware; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -41,7 +40,7 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; * * @version */ -public class Pipeline extends MulticastProcessor implements AsyncProcessor, Traceable, IdAware { +public class Pipeline extends MulticastProcessor { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private String id; @@ -59,10 +58,29 @@ public class Pipeline extends MulticastProcessor implements AsyncProcessor, Trac return new Pipeline(camelContext, processors); } + public static Processor newInstance(final CamelContext camelContext, final Processor... processors) { + if (processors == null || processors.length == 0) { + return null; + } else if (processors.length == 1) { + return processors[0]; + } + + final List<Processor> toBeProcessed = new ArrayList<>(processors.length); + for (Processor processor : processors) { + if (processor != null) { + toBeProcessed.add(processor); + } + } + + return new Pipeline(camelContext, toBeProcessed); + } + + @Override public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } + @Override public boolean process(Exchange exchange, AsyncCallback callback) { Iterator<Processor> processors = getProcessors().iterator(); Exchange nextExchange = exchange; @@ -118,7 +136,8 @@ public class Pipeline extends MulticastProcessor implements AsyncProcessor, Trac // implement asynchronous routing logic in callback so we can have the callback being // triggered and then continue routing where we left boolean sync = asyncProcessor.process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { + @Override + public void done(final boolean doneSync) { // we only have to handle async completion of the pipeline if (doneSync) { return; @@ -135,8 +154,8 @@ public class Pipeline extends MulticastProcessor implements AsyncProcessor, Trac } nextExchange = createNextExchange(nextExchange); - doneSync = process(original, nextExchange, callback, processors, processor); - if (!doneSync) { + boolean isDoneSync = process(original, nextExchange, callback, processors, processor); + if (!isDoneSync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); return; } @@ -192,10 +211,12 @@ public class Pipeline extends MulticastProcessor implements AsyncProcessor, Trac return "pipeline"; } + @Override public String getId() { return id; } + @Override public void setId(String id) { this.id = id; } http://git-wip-us.apache.org/repos/asf/camel/blob/e37cf3fd/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java deleted file mode 100644 index 53c3860..0000000 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.connector; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.processor.DelegateAsyncProcessor; -import org.apache.camel.util.ServiceHelper; - -/** - * Connector {@link Processor} which is capable of performing before and after custom processing - * while consuming a message (ie from the consumer). - */ -public class ConnectorConsumerProcessor extends DelegateAsyncProcessor { - - private final Processor beforeConsumer; - private final Processor afterConsumer; - - public ConnectorConsumerProcessor(Processor processor, Processor beforeConsumer, Processor afterConsumer) { - super(processor); - this.beforeConsumer = beforeConsumer; - this.afterConsumer = afterConsumer; - } - - @Override - public boolean process(Exchange exchange, final AsyncCallback callback) { - // setup callback for after consumer - AsyncCallback delegate = doneSync -> { - if (afterConsumer != null) { - try { - afterConsumer.process(exchange); - } catch (Throwable e) { - exchange.setException(e); - } - } - callback.done(doneSync); - }; - - // perform any before consumer - if (beforeConsumer != null) { - try { - beforeConsumer.process(exchange); - } catch (Throwable e) { - exchange.setException(e); - callback.done(true); - return true; - } - } - - // process the consumer - return super.process(exchange, delegate); - } - - @Override - protected void doStart() throws Exception { - ServiceHelper.startServices(beforeConsumer, processor, afterConsumer); - } - - @Override - protected void doStop() throws Exception { - ServiceHelper.stopServices(beforeConsumer, processor, afterConsumer); - } - - @Override - protected void doSuspend() throws Exception { - ServiceHelper.suspendService(processor); - } - - @Override - protected void doResume() throws Exception { - ServiceHelper.resumeService(processor); - } - - @Override - protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(beforeConsumer, processor, afterConsumer); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/e37cf3fd/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java index b5db55d..28e80d0 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java @@ -23,76 +23,50 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.processor.Pipeline; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.ServiceHelper; /** * Connector {@link Producer} which is capable of performing before and after custom processing - * while processing (ie sending the message). + * via the {@link Pipeline }while processing (ie sending the message). */ public class ConnectorProducer extends DefaultAsyncProducer { - private final AsyncProcessor producer; - private final Processor beforeProducer; - private final Processor afterProducer; + private final AsyncProcessor processor; - public ConnectorProducer(Endpoint endpoint, Producer producer, Processor beforeProducer, Processor afterProducer) { + public ConnectorProducer(final Endpoint endpoint, final Processor processor) { super(endpoint); - this.producer = AsyncProcessorConverterHelper.convert(producer); - this.beforeProducer = beforeProducer; - this.afterProducer = afterProducer; + this.processor = AsyncProcessorConverterHelper.convert(processor); } @Override - public boolean process(Exchange exchange, final AsyncCallback callback) { - // setup callback for after producer - AsyncCallback delegate = doneSync -> { - if (afterProducer != null) { - try { - afterProducer.process(exchange); - } catch (Throwable e) { - exchange.setException(e); - } - } - callback.done(doneSync); - }; - - // perform any before producer - if (beforeProducer != null) { - try { - beforeProducer.process(exchange); - } catch (Throwable e) { - exchange.setException(e); - callback.done(true); - return true; - } - } - - return producer.process(exchange, delegate); + public boolean process(final Exchange exchange, final AsyncCallback callback) { + return processor.process(exchange, callback); } @Override protected void doStart() throws Exception { - ServiceHelper.startServices(beforeProducer, producer, afterProducer); + ServiceHelper.startServices(processor); } @Override protected void doStop() throws Exception { - ServiceHelper.stopServices(beforeProducer, producer, afterProducer); + ServiceHelper.stopServices(processor); } @Override protected void doSuspend() throws Exception { - ServiceHelper.suspendService(producer); + ServiceHelper.suspendService(processor); } @Override protected void doResume() throws Exception { - ServiceHelper.resumeService(producer); + ServiceHelper.resumeService(processor); } @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(beforeProducer, producer, afterProducer); + ServiceHelper.stopAndShutdownServices(processor); } } http://git-wip-us.apache.org/repos/asf/camel/blob/e37cf3fd/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java index cb254ce..9e68f0f 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java @@ -24,6 +24,7 @@ import org.apache.camel.Producer; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.processor.Pipeline; import org.apache.camel.util.ServiceHelper; @ManagedResource(description = "Managed Connector Endpoint") @@ -43,15 +44,29 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat @Override public Producer createProducer() throws Exception { - Producer producer = endpoint.createProducer(); - return new ConnectorProducer(endpoint, producer, getComponent().getBeforeProducer(), getComponent().getAfterProducer()); + final Producer producer = endpoint.createProducer(); + + final Processor beforeProducer = getComponent().getBeforeProducer(); + final Processor afterProducer = getComponent().getAfterProducer(); + + // use a pipeline to process before, producer, after in that order + // create producer with the pipeline + final Processor pipeline = Pipeline.newInstance(getCamelContext(), beforeProducer, producer, afterProducer); + + return new ConnectorProducer(endpoint, pipeline); } @Override - public Consumer createConsumer(Processor processor) throws Exception { - ConnectorConsumerProcessor delegate = new ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(), getComponent().getAfterConsumer()); - Consumer consumer = endpoint.createConsumer(delegate); + public Consumer createConsumer(final Processor processor) throws Exception { + final Processor beforeConsumer = getComponent().getBeforeConsumer(); + final Processor afterConsumer = getComponent().getAfterConsumer(); + + // use a pipeline to process before, processor, after in that order + // create consumer with the pipeline + final Processor pipeline = Pipeline.newInstance(getCamelContext(), beforeConsumer, processor, afterConsumer); + final Consumer consumer = endpoint.createConsumer(pipeline); configureConsumer(consumer); + return consumer; }