Repository: camel
Updated Branches:
  refs/heads/master 38c3cfa50 -> 3e1f66d04


 Camel connector allow to do custom logic before producer or consumer does 
anything.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3e1f66d0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3e1f66d0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3e1f66d0

Branch: refs/heads/master
Commit: 3e1f66d04d352a890bc591e99afd3a94d51e5499
Parents: 38c3cfa
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Apr 7 15:13:59 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Apr 7 15:13:59 2017 +0200

----------------------------------------------------------------------
 .../connector/ConnectorConsumerProcessor.java   | 69 ++++++++++++++++++++
 .../component/connector/ConnectorProducer.java  |  4 ++
 .../connector/DefaultConnectorEndpoint.java     |  4 +-
 3 files changed, 75 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
new file mode 100644
index 0000000..b8d6eaa
--- /dev/null
+++ 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.connector;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.support.SynchronizationAdapter;
+
+/**
+ * Connector {@link Processor} which is capable of performing before and after 
custom processing
+ * while consuming a message (ie from the consumer).
+ */
+public class ConnectorConsumerProcessor extends DelegateAsyncProcessor {
+
+    private final Processor beforeConsumer;
+    private final Processor afterConsumer;
+
+    public ConnectorConsumerProcessor(Processor processor, Processor 
beforeConsumer, Processor afterConsumer) {
+        super(processor);
+        this.beforeConsumer = beforeConsumer;
+        this.afterConsumer = afterConsumer;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (afterConsumer != null) {
+            exchange.getUnitOfWork().addSynchronization(new 
SynchronizationAdapter() {
+                @Override
+                public void onAfterRoute(Route route, Exchange exchange) {
+                    try {
+                        afterConsumer.process(exchange);
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+            });
+        }
+
+        if (beforeConsumer != null) {
+            try {
+                beforeConsumer.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        }
+
+        return super.process(exchange, callback);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
index b0d7225..2904ac6 100644
--- 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
+++ 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -25,6 +25,10 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ServiceHelper;
 
+/**
+ * Connector {@link Producer} which is capable of performing before and after 
custom processing
+ * while processing (ie sending the message).
+ */
 public class ConnectorProducer extends DefaultProducer {
 
     private final Producer producer;

http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index ba9e93b..ac9b2a9 100644
--- 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -49,8 +49,8 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint 
implements Delegat
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Consumer answer = endpoint.createConsumer(processor);
-        return answer;
+        ConnectorConsumerProcessor delegate = new 
ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(), 
getComponent().getAfterConsumer());
+        return endpoint.createConsumer(delegate);
     }
 
     @Override

Reply via email to