Repository: camel Updated Branches: refs/heads/connector-async [created] 5b1b92669
camel-connector before/after should support async routing engine via pipeline which also is simpler code. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b1b9266 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b1b9266 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b1b9266 Branch: refs/heads/connector-async Commit: 5b1b92669ee238b3a1e2c80545ed88e9464f1298 Parents: a12916c Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Oct 6 10:14:28 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Oct 6 10:14:28 2017 +0200 ---------------------------------------------------------------------- .../connector/ConnectorConsumerProcessor.java | 94 -------------------- .../component/connector/ConnectorProducer.java | 50 +++-------- .../connector/DefaultConnectorEndpoint.java | 34 ++++++- 3 files changed, 42 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5b1b9266/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java deleted file mode 100644 index 53c3860..0000000 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.connector; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.processor.DelegateAsyncProcessor; -import org.apache.camel.util.ServiceHelper; - -/** - * Connector {@link Processor} which is capable of performing before and after custom processing - * while consuming a message (ie from the consumer). - */ -public class ConnectorConsumerProcessor extends DelegateAsyncProcessor { - - private final Processor beforeConsumer; - private final Processor afterConsumer; - - public ConnectorConsumerProcessor(Processor processor, Processor beforeConsumer, Processor afterConsumer) { - super(processor); - this.beforeConsumer = beforeConsumer; - this.afterConsumer = afterConsumer; - } - - @Override - public boolean process(Exchange exchange, final AsyncCallback callback) { - // setup callback for after consumer - AsyncCallback delegate = doneSync -> { - if (afterConsumer != null) { - try { - afterConsumer.process(exchange); - } catch (Throwable e) { - exchange.setException(e); - } - } - callback.done(doneSync); - }; - - // perform any before consumer - if (beforeConsumer != null) { - try { - beforeConsumer.process(exchange); - } catch (Throwable e) { - exchange.setException(e); - callback.done(true); - return true; - } - } - - // process the consumer - return super.process(exchange, delegate); - } - - @Override - protected void doStart() throws Exception { - ServiceHelper.startServices(beforeConsumer, processor, afterConsumer); - } - - @Override - protected void doStop() throws Exception { - ServiceHelper.stopServices(beforeConsumer, processor, afterConsumer); - } - - @Override - protected void doSuspend() throws Exception { - ServiceHelper.suspendService(processor); - } - - @Override - protected void doResume() throws Exception { - ServiceHelper.resumeService(processor); - } - - @Override - protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(beforeConsumer, processor, afterConsumer); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/5b1b9266/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java index b5db55d..33f64c5 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java @@ -20,79 +20,51 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultAsyncProducer; -import org.apache.camel.util.AsyncProcessorConverterHelper; +import org.apache.camel.processor.Pipeline; import org.apache.camel.util.ServiceHelper; /** * Connector {@link Producer} which is capable of performing before and after custom processing - * while processing (ie sending the message). + * via the {@link Pipeline }while processing (ie sending the message). */ public class ConnectorProducer extends DefaultAsyncProducer { - private final AsyncProcessor producer; - private final Processor beforeProducer; - private final Processor afterProducer; + private final AsyncProcessor processor; - public ConnectorProducer(Endpoint endpoint, Producer producer, Processor beforeProducer, Processor afterProducer) { + public ConnectorProducer(Endpoint endpoint, Pipeline processor) { super(endpoint); - this.producer = AsyncProcessorConverterHelper.convert(producer); - this.beforeProducer = beforeProducer; - this.afterProducer = afterProducer; + this.processor = processor; } @Override public boolean process(Exchange exchange, final AsyncCallback callback) { - // setup callback for after producer - AsyncCallback delegate = doneSync -> { - if (afterProducer != null) { - try { - afterProducer.process(exchange); - } catch (Throwable e) { - exchange.setException(e); - } - } - callback.done(doneSync); - }; - - // perform any before producer - if (beforeProducer != null) { - try { - beforeProducer.process(exchange); - } catch (Throwable e) { - exchange.setException(e); - callback.done(true); - return true; - } - } - - return producer.process(exchange, delegate); + return processor.process(exchange, callback); } @Override protected void doStart() throws Exception { - ServiceHelper.startServices(beforeProducer, producer, afterProducer); + ServiceHelper.startServices(processor); } @Override protected void doStop() throws Exception { - ServiceHelper.stopServices(beforeProducer, producer, afterProducer); + ServiceHelper.stopServices(processor); } @Override protected void doSuspend() throws Exception { - ServiceHelper.suspendService(producer); + ServiceHelper.suspendService(processor); } @Override protected void doResume() throws Exception { - ServiceHelper.resumeService(producer); + ServiceHelper.resumeService(processor); } @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(beforeProducer, producer, afterProducer); + ServiceHelper.stopAndShutdownServices(processor); } } http://git-wip-us.apache.org/repos/asf/camel/blob/5b1b9266/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java index cb254ce..c8158ad 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.connector; +import java.util.ArrayList; +import java.util.List; + import org.apache.camel.Consumer; import org.apache.camel.DelegateEndpoint; import org.apache.camel.Endpoint; @@ -24,6 +27,7 @@ import org.apache.camel.Producer; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.processor.Pipeline; import org.apache.camel.util.ServiceHelper; @ManagedResource(description = "Managed Connector Endpoint") @@ -44,13 +48,37 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat @Override public Producer createProducer() throws Exception { Producer producer = endpoint.createProducer(); - return new ConnectorProducer(endpoint, producer, getComponent().getBeforeProducer(), getComponent().getAfterProducer()); + + // use a pipeline to process before, producer, after in that order + List<Processor> list = new ArrayList<>(); + if (getComponent().getBeforeProducer() != null) { + list.add(getComponent().getBeforeProducer()); + } + list.add(producer); + if (getComponent().getAfterConsumer() != null) { + list.add(getComponent().getAfterProducer()); + } + + // create producer with the pipeline + Pipeline pipeline = new Pipeline(getCamelContext(), list); + return new ConnectorProducer(endpoint, pipeline); } @Override public Consumer createConsumer(Processor processor) throws Exception { - ConnectorConsumerProcessor delegate = new ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(), getComponent().getAfterConsumer()); - Consumer consumer = endpoint.createConsumer(delegate); + // use a pipeline to process before, processor, after in that order + List<Processor> list = new ArrayList<>(); + if (getComponent().getBeforeConsumer() != null) { + list.add(getComponent().getBeforeConsumer()); + } + list.add(processor); + if (getComponent().getAfterConsumer() != null) { + list.add(getComponent().getAfterConsumer()); + } + + // create consumer with the pipeline + Pipeline pipeline = new Pipeline(getCamelContext(), list); + Consumer consumer = endpoint.createConsumer(pipeline); configureConsumer(consumer); return consumer; }