Repository: camel Updated Branches: refs/heads/master 38c3cfa50 -> 3e1f66d04
Camel connector allow to do custom logic before producer or consumer does anything. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3e1f66d0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3e1f66d0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3e1f66d0 Branch: refs/heads/master Commit: 3e1f66d04d352a890bc591e99afd3a94d51e5499 Parents: 38c3cfa Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 7 15:13:59 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Apr 7 15:13:59 2017 +0200 ---------------------------------------------------------------------- .../connector/ConnectorConsumerProcessor.java | 69 ++++++++++++++++++++ .../component/connector/ConnectorProducer.java | 4 ++ .../connector/DefaultConnectorEndpoint.java | 4 +- 3 files changed, 75 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java new file mode 100644 index 0000000..b8d6eaa --- /dev/null +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.connector; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.processor.DelegateAsyncProcessor; +import org.apache.camel.support.SynchronizationAdapter; + +/** + * Connector {@link Processor} which is capable of performing before and after custom processing + * while consuming a message (ie from the consumer). + */ +public class ConnectorConsumerProcessor extends DelegateAsyncProcessor { + + private final Processor beforeConsumer; + private final Processor afterConsumer; + + public ConnectorConsumerProcessor(Processor processor, Processor beforeConsumer, Processor afterConsumer) { + super(processor); + this.beforeConsumer = beforeConsumer; + this.afterConsumer = afterConsumer; + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + if (afterConsumer != null) { + exchange.getUnitOfWork().addSynchronization(new SynchronizationAdapter() { + @Override + public void onAfterRoute(Route route, Exchange exchange) { + try { + afterConsumer.process(exchange); + } catch (Exception e) { + // ignore + } + } + }); + } + + if (beforeConsumer != null) { + try { + beforeConsumer.process(exchange); + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; + } + } + + return super.process(exchange, callback); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java index b0d7225..2904ac6 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java @@ -25,6 +25,10 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.ServiceHelper; +/** + * Connector {@link Producer} which is capable of performing before and after custom processing + * while processing (ie sending the message). + */ public class ConnectorProducer extends DefaultProducer { private final Producer producer; http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java index ba9e93b..ac9b2a9 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java @@ -49,8 +49,8 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat @Override public Consumer createConsumer(Processor processor) throws Exception { - Consumer answer = endpoint.createConsumer(processor); - return answer; + ConnectorConsumerProcessor delegate = new ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(), getComponent().getAfterConsumer()); + return endpoint.createConsumer(delegate); } @Override