Repository: camel Updated Branches: refs/heads/master 3e1f66d04 -> bef3d9224
Camel connector allow to do custom logic before producer or consumer does anything. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bef3d922 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bef3d922 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bef3d922 Branch: refs/heads/master Commit: bef3d9224d73fa54d67f3c3f8f572ba5062efad4 Parents: 3e1f66d Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 7 15:32:18 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Apr 7 15:32:18 2017 +0200 ---------------------------------------------------------------------- .../connector/ConnectorConsumerProcessor.java | 29 +++++++------ .../component/connector/ConnectorProducer.java | 44 +++++++++++++------- .../connector/DefaultConnectorEndpoint.java | 4 +- 3 files changed, 45 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bef3d922/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java index b8d6eaa..aa717ec 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java @@ -19,9 +19,7 @@ package org.apache.camel.component.connector; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.Route; import org.apache.camel.processor.DelegateAsyncProcessor; -import org.apache.camel.support.SynchronizationAdapter; /** * Connector {@link Processor} which is capable of performing before and after custom processing @@ -39,20 +37,20 @@ public class ConnectorConsumerProcessor extends DelegateAsyncProcessor { } @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - if (afterConsumer != null) { - exchange.getUnitOfWork().addSynchronization(new SynchronizationAdapter() { - @Override - public void onAfterRoute(Route route, Exchange exchange) { - try { - afterConsumer.process(exchange); - } catch (Exception e) { - // ignore - } + public boolean process(Exchange exchange, final AsyncCallback callback) { + // setup callback for after consumer + AsyncCallback delegate = doneSync -> { + if (afterConsumer != null) { + try { + afterConsumer.process(exchange); + } catch (Throwable e) { + exchange.setException(e); } - }); - } + } + callback.done(doneSync); + }; + // perform any before consumer if (beforeConsumer != null) { try { beforeConsumer.process(exchange); @@ -63,7 +61,8 @@ public class ConnectorConsumerProcessor extends DelegateAsyncProcessor { } } - return super.process(exchange, callback); + // process the consumer + return super.process(exchange, delegate); } } http://git-wip-us.apache.org/repos/asf/camel/blob/bef3d922/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java index 2904ac6..2cdc1d7 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java @@ -16,47 +16,59 @@ */ package org.apache.camel.component.connector; -import java.util.concurrent.RejectedExecutionException; - +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.ServiceHelper; /** * Connector {@link Producer} which is capable of performing before and after custom processing * while processing (ie sending the message). */ -public class ConnectorProducer extends DefaultProducer { +public class ConnectorProducer extends DefaultAsyncProducer { - private final Producer producer; + private final AsyncProcessor producer; private final Processor beforeProducer; private final Processor afterProducer; public ConnectorProducer(Endpoint endpoint, Producer producer, Processor beforeProducer, Processor afterProducer) { super(endpoint); - this.producer = producer; + this.producer = AsyncProcessorConverterHelper.convert(producer); this.beforeProducer = beforeProducer; this.afterProducer = afterProducer; } @Override - public void process(Exchange exchange) throws Exception { - if (!isRunAllowed()) { - throw new RejectedExecutionException(); - } + public boolean process(Exchange exchange, final AsyncCallback callback) { + // setup callback for after producer + AsyncCallback delegate = doneSync -> { + if (afterProducer != null) { + try { + afterProducer.process(exchange); + } catch (Throwable e) { + exchange.setException(e); + } + } + callback.done(doneSync); + }; + // perform any before producer if (beforeProducer != null) { - beforeProducer.process(exchange); + try { + beforeProducer.process(exchange); + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; + } } - producer.process(exchange); - - if (afterProducer != null) { - afterProducer.process(exchange); - } + return producer.process(exchange, delegate); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/bef3d922/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java index ac9b2a9..cb254ce 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java @@ -50,7 +50,9 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat @Override public Consumer createConsumer(Processor processor) throws Exception { ConnectorConsumerProcessor delegate = new ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(), getComponent().getAfterConsumer()); - return endpoint.createConsumer(delegate); + Consumer consumer = endpoint.createConsumer(delegate); + configureConsumer(consumer); + return consumer; } @Override