Repository: camel
Updated Branches:
  refs/heads/master 3e1f66d04 -> bef3d9224


Camel connector allow to do custom logic before producer or consumer does 
anything.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bef3d922
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bef3d922
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bef3d922

Branch: refs/heads/master
Commit: bef3d9224d73fa54d67f3c3f8f572ba5062efad4
Parents: 3e1f66d
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Apr 7 15:32:18 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Apr 7 15:32:18 2017 +0200

----------------------------------------------------------------------
 .../connector/ConnectorConsumerProcessor.java   | 29 +++++++------
 .../component/connector/ConnectorProducer.java  | 44 +++++++++++++-------
 .../connector/DefaultConnectorEndpoint.java     |  4 +-
 3 files changed, 45 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bef3d922/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
index b8d6eaa..aa717ec 100644
--- 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
+++ 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
@@ -19,9 +19,7 @@ package org.apache.camel.component.connector;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.Route;
 import org.apache.camel.processor.DelegateAsyncProcessor;
-import org.apache.camel.support.SynchronizationAdapter;
 
 /**
  * Connector {@link Processor} which is capable of performing before and after 
custom processing
@@ -39,20 +37,20 @@ public class ConnectorConsumerProcessor extends 
DelegateAsyncProcessor {
     }
 
     @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        if (afterConsumer != null) {
-            exchange.getUnitOfWork().addSynchronization(new 
SynchronizationAdapter() {
-                @Override
-                public void onAfterRoute(Route route, Exchange exchange) {
-                    try {
-                        afterConsumer.process(exchange);
-                    } catch (Exception e) {
-                        // ignore
-                    }
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        // setup callback for after consumer
+        AsyncCallback delegate = doneSync -> {
+            if (afterConsumer != null) {
+                try {
+                    afterConsumer.process(exchange);
+                } catch (Throwable e) {
+                    exchange.setException(e);
                 }
-            });
-        }
+            }
+            callback.done(doneSync);
+        };
 
+        // perform any before consumer
         if (beforeConsumer != null) {
             try {
                 beforeConsumer.process(exchange);
@@ -63,7 +61,8 @@ public class ConnectorConsumerProcessor extends 
DelegateAsyncProcessor {
             }
         }
 
-        return super.process(exchange, callback);
+        // process the consumer
+        return super.process(exchange, delegate);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bef3d922/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
index 2904ac6..2cdc1d7 100644
--- 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
+++ 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -16,47 +16,59 @@
  */
 package org.apache.camel.component.connector;
 
-import java.util.concurrent.RejectedExecutionException;
-
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.ServiceHelper;
 
 /**
  * Connector {@link Producer} which is capable of performing before and after 
custom processing
  * while processing (ie sending the message).
  */
-public class ConnectorProducer extends DefaultProducer {
+public class ConnectorProducer extends DefaultAsyncProducer {
 
-    private final Producer producer;
+    private final AsyncProcessor producer;
     private final Processor beforeProducer;
     private final Processor afterProducer;
 
     public ConnectorProducer(Endpoint endpoint, Producer producer, Processor 
beforeProducer, Processor afterProducer) {
         super(endpoint);
-        this.producer = producer;
+        this.producer = AsyncProcessorConverterHelper.convert(producer);
         this.beforeProducer = beforeProducer;
         this.afterProducer = afterProducer;
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
-        if (!isRunAllowed()) {
-            throw new RejectedExecutionException();
-        }
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        // setup callback for after producer
+        AsyncCallback delegate = doneSync -> {
+            if (afterProducer != null) {
+                try {
+                    afterProducer.process(exchange);
+                } catch (Throwable e) {
+                    exchange.setException(e);
+                }
+            }
+            callback.done(doneSync);
+        };
 
+        // perform any before producer
         if (beforeProducer != null) {
-            beforeProducer.process(exchange);
+            try {
+                beforeProducer.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
         }
 
-        producer.process(exchange);
-
-        if (afterProducer != null) {
-            afterProducer.process(exchange);
-        }
+        return producer.process(exchange, delegate);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/bef3d922/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index ac9b2a9..cb254ce 100644
--- 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -50,7 +50,9 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint 
implements Delegat
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         ConnectorConsumerProcessor delegate = new 
ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(), 
getComponent().getAfterConsumer());
-        return endpoint.createConsumer(delegate);
+        Consumer consumer = endpoint.createConsumer(delegate);
+        configureConsumer(consumer);
+        return consumer;
     }
 
     @Override

Reply via email to