Repository: camel
Updated Branches:
  refs/heads/master e007a6c2c -> e37cf3fde


camel-connector before/after should support asy...

...nc routing engine via pipeline which also is simpler code

Modifies the `DefaultConnectorEndpoint` to create `Pipeline` based
`Consumer`s and `Producer`s. Additional helper method was added to
`Pipeline` to create it for such use cases.

Cleans up `Pipeline` by adding `@Override` and removes interfaces that
are already inherited from `MulticastProcessor`.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e37cf3fd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e37cf3fd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e37cf3fd

Branch: refs/heads/master
Commit: e37cf3fde55946973dd121cfb48ce7ae4806adc2
Parents: e007a6c
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Oct 6 10:14:28 2017 +0200
Committer: Zoran Regvart <zregv...@apache.org>
Committed: Fri Oct 6 11:58:21 2017 +0200

----------------------------------------------------------------------
 .../org/apache/camel/processor/Pipeline.java    | 33 +++++--
 .../connector/ConnectorConsumerProcessor.java   | 94 --------------------
 .../component/connector/ConnectorProducer.java  | 50 +++--------
 .../connector/DefaultConnectorEndpoint.java     | 25 ++++--
 4 files changed, 59 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e37cf3fd/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java 
b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
index b35252d..e75c20c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -25,8 +26,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.Traceable;
-import org.apache.camel.spi.IdAware;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
@@ -41,7 +40,7 @@ import static 
org.apache.camel.processor.PipelineHelper.continueProcessing;
  *
  * @version 
  */
-public class Pipeline extends MulticastProcessor implements AsyncProcessor, 
Traceable, IdAware {
+public class Pipeline extends MulticastProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
 
     private String id;
@@ -59,10 +58,29 @@ public class Pipeline extends MulticastProcessor implements 
AsyncProcessor, Trac
         return new Pipeline(camelContext, processors);
     }
 
+    public static Processor newInstance(final CamelContext camelContext, final 
Processor... processors) {
+        if (processors == null || processors.length == 0) {
+            return null;
+        } else if (processors.length == 1) {
+            return processors[0];
+        }
+
+        final List<Processor> toBeProcessed = new 
ArrayList<>(processors.length);
+        for (Processor processor : processors) {
+            if (processor != null) {
+                toBeProcessed.add(processor);
+            }
+        }
+
+        return new Pipeline(camelContext, toBeProcessed);
+    }
+
+    @Override
     public void process(Exchange exchange) throws Exception {
         AsyncProcessorHelper.process(this, exchange);
     }
 
+    @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         Iterator<Processor> processors = getProcessors().iterator();
         Exchange nextExchange = exchange;
@@ -118,7 +136,8 @@ public class Pipeline extends MulticastProcessor implements 
AsyncProcessor, Trac
         // implement asynchronous routing logic in callback so we can have the 
callback being
         // triggered and then continue routing where we left
         boolean sync = asyncProcessor.process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
+            @Override
+            public void done(final boolean doneSync) {
                 // we only have to handle async completion of the pipeline
                 if (doneSync) {
                     return;
@@ -135,8 +154,8 @@ public class Pipeline extends MulticastProcessor implements 
AsyncProcessor, Trac
                     }
 
                     nextExchange = createNextExchange(nextExchange);
-                    doneSync = process(original, nextExchange, callback, 
processors, processor);
-                    if (!doneSync) {
+                    boolean isDoneSync = process(original, nextExchange, 
callback, processors, processor);
+                    if (!isDoneSync) {
                         LOG.trace("Processing exchangeId: {} is continued 
being processed asynchronously", exchange.getExchangeId());
                         return;
                     }
@@ -192,10 +211,12 @@ public class Pipeline extends MulticastProcessor 
implements AsyncProcessor, Trac
         return "pipeline";
     }
 
+    @Override
     public String getId() {
         return id;
     }
 
+    @Override
     public void setId(String id) {
         this.id = id;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e37cf3fd/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
deleted file mode 100644
index 53c3860..0000000
--- 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.connector;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.processor.DelegateAsyncProcessor;
-import org.apache.camel.util.ServiceHelper;
-
-/**
- * Connector {@link Processor} which is capable of performing before and after 
custom processing
- * while consuming a message (ie from the consumer).
- */
-public class ConnectorConsumerProcessor extends DelegateAsyncProcessor {
-
-    private final Processor beforeConsumer;
-    private final Processor afterConsumer;
-
-    public ConnectorConsumerProcessor(Processor processor, Processor 
beforeConsumer, Processor afterConsumer) {
-        super(processor);
-        this.beforeConsumer = beforeConsumer;
-        this.afterConsumer = afterConsumer;
-    }
-
-    @Override
-    public boolean process(Exchange exchange, final AsyncCallback callback) {
-        // setup callback for after consumer
-        AsyncCallback delegate = doneSync -> {
-            if (afterConsumer != null) {
-                try {
-                    afterConsumer.process(exchange);
-                } catch (Throwable e) {
-                    exchange.setException(e);
-                }
-            }
-            callback.done(doneSync);
-        };
-
-        // perform any before consumer
-        if (beforeConsumer != null) {
-            try {
-                beforeConsumer.process(exchange);
-            } catch (Throwable e) {
-                exchange.setException(e);
-                callback.done(true);
-                return true;
-            }
-        }
-
-        // process the consumer
-        return super.process(exchange, delegate);
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        ServiceHelper.startServices(beforeConsumer, processor, afterConsumer);
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        ServiceHelper.stopServices(beforeConsumer, processor, afterConsumer);
-    }
-
-    @Override
-    protected void doSuspend() throws Exception {
-        ServiceHelper.suspendService(processor);
-    }
-
-    @Override
-    protected void doResume() throws Exception {
-        ServiceHelper.resumeService(processor);
-    }
-
-    @Override
-    protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(beforeConsumer, processor, 
afterConsumer);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/e37cf3fd/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
index b5db55d..28e80d0 100644
--- 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
+++ 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -23,76 +23,50 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.processor.Pipeline;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.ServiceHelper;
 
 /**
  * Connector {@link Producer} which is capable of performing before and after 
custom processing
- * while processing (ie sending the message).
+ * via the {@link Pipeline }while processing (ie sending the message).
  */
 public class ConnectorProducer extends DefaultAsyncProducer {
 
-    private final AsyncProcessor producer;
-    private final Processor beforeProducer;
-    private final Processor afterProducer;
+    private final AsyncProcessor processor;
 
-    public ConnectorProducer(Endpoint endpoint, Producer producer, Processor 
beforeProducer, Processor afterProducer) {
+    public ConnectorProducer(final Endpoint endpoint, final Processor 
processor) {
         super(endpoint);
-        this.producer = AsyncProcessorConverterHelper.convert(producer);
-        this.beforeProducer = beforeProducer;
-        this.afterProducer = afterProducer;
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
     @Override
-    public boolean process(Exchange exchange, final AsyncCallback callback) {
-        // setup callback for after producer
-        AsyncCallback delegate = doneSync -> {
-            if (afterProducer != null) {
-                try {
-                    afterProducer.process(exchange);
-                } catch (Throwable e) {
-                    exchange.setException(e);
-                }
-            }
-            callback.done(doneSync);
-        };
-
-        // perform any before producer
-        if (beforeProducer != null) {
-            try {
-                beforeProducer.process(exchange);
-            } catch (Throwable e) {
-                exchange.setException(e);
-                callback.done(true);
-                return true;
-            }
-        }
-
-        return producer.process(exchange, delegate);
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        return processor.process(exchange, callback);
     }
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startServices(beforeProducer, producer, afterProducer);
+        ServiceHelper.startServices(processor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(beforeProducer, producer, afterProducer);
+        ServiceHelper.stopServices(processor);
     }
 
     @Override
     protected void doSuspend() throws Exception {
-        ServiceHelper.suspendService(producer);
+        ServiceHelper.suspendService(processor);
     }
 
     @Override
     protected void doResume() throws Exception {
-        ServiceHelper.resumeService(producer);
+        ServiceHelper.resumeService(processor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(beforeProducer, producer, 
afterProducer);
+        ServiceHelper.stopAndShutdownServices(processor);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e37cf3fd/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index cb254ce..9e68f0f 100644
--- 
a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ 
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.Pipeline;
 import org.apache.camel.util.ServiceHelper;
 
 @ManagedResource(description = "Managed Connector Endpoint")
@@ -43,15 +44,29 @@ public class DefaultConnectorEndpoint extends 
DefaultEndpoint implements Delegat
 
     @Override
     public Producer createProducer() throws Exception {
-        Producer producer = endpoint.createProducer();
-        return new ConnectorProducer(endpoint, producer, 
getComponent().getBeforeProducer(), getComponent().getAfterProducer());
+        final Producer producer = endpoint.createProducer();
+
+        final Processor beforeProducer = getComponent().getBeforeProducer();
+        final Processor afterProducer = getComponent().getAfterProducer();
+
+        // use a pipeline to process before, producer, after in that order
+        // create producer with the pipeline
+        final Processor pipeline = Pipeline.newInstance(getCamelContext(), 
beforeProducer, producer, afterProducer);
+
+        return new ConnectorProducer(endpoint, pipeline);
     }
 
     @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
-        ConnectorConsumerProcessor delegate = new 
ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(), 
getComponent().getAfterConsumer());
-        Consumer consumer = endpoint.createConsumer(delegate);
+    public Consumer createConsumer(final Processor processor) throws Exception 
{
+        final Processor beforeConsumer = getComponent().getBeforeConsumer();
+        final Processor afterConsumer = getComponent().getAfterConsumer();
+
+        // use a pipeline to process before, processor, after in that order
+        // create consumer with the pipeline
+        final Processor pipeline = Pipeline.newInstance(getCamelContext(), 
beforeConsumer, processor, afterConsumer);
+        final Consumer consumer = endpoint.createConsumer(pipeline);
         configureConsumer(consumer);
+
         return consumer;
     }
 

Reply via email to