Repository: camel
Updated Branches:
  refs/heads/master 5104c1669 -> 1c8eda402


CAMEL-10308
Provide a way to use async engine from ProducerTemplate
style fixes, optiona rename & don't include out of camel callbacks into 
processing time


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c8eda40
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c8eda40
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c8eda40

Branch: refs/heads/master
Commit: 1c8eda4020531e1e2ecf560fb1b91f1ddd435dd2
Parents: 51a8068
Author: Vitalii Tymchyshyn <v...@tym.im>
Authored: Fri Sep 16 20:39:27 2016 -0400
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Sep 17 09:34:52 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/ProducerTemplate.java |  23 ++--
 ...AsyncCallbackToCompletableFutureAdapter.java |   8 +-
 .../camel/impl/DefaultProducerTemplate.java     |  12 +-
 .../camel/impl/EventNotifierCallback.java       |  11 +-
 .../org/apache/camel/impl/ProducerCache.java    | 115 +++++++++----------
 ...ultProducerTemplateNonBlockingAsyncTest.java |  10 +-
 6 files changed, 85 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java 
b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
index 6461e4e..8858e74 100644
--- a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel;
 
-import org.apache.camel.spi.Synchronization;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -25,6 +23,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.camel.spi.Synchronization;
+
 /**
  * Template for working with Camel and sending {@link Message} instances in an
  * {@link Exchange} to an {@link Endpoint}.
@@ -94,19 +94,20 @@ public interface ProducerTemplate extends Service {
     int getCurrentCacheSize();
 
     /**
-     * Reports if async* methods will dispath processing from the calling 
thread (true) or through executor (false).
-     * They will still employ asynchronous engine, so this mode can be useful 
for high-speed non-blocking processing.
-     * @return if async* methods will run in the calling thread
+     * Reports if async* methods will dispath processing from the calling 
thread (false) or through executor (true).
+     * In both cases asynchronous engine will be used, so this non-threaded 
can be useful for high-speed
+     * non-blocking processing.
+     * @return if async* methods will dipatch processing with the executor
      */
-    boolean isSynchronous();
+    boolean isThreadedAsyncMode();
 
     /**
-     * Reports if async* methods will dispath processing from the calling 
thread (true) or through executor (false).
-     * In any case they would still employ asynchronous engine, so setting to 
true can be useful
-     * for high-speed non-blocking processing.
-     * @param synchronous if async* methods will run in the calling thread
+     * Reports if async* methods will dispath processing from the calling 
thread (false) or through executor (true).
+     * In both cases asynchronous engine will be used, so this non-threaded 
can be useful for high-speed
+     * non-blocking processing.
+     * @param useExecutor if async* methods will dipatch processing with the 
executor
      */
-    void setSynchronous(boolean synchronous);
+    void setThreadedAsyncMode(boolean useExecutor);
     
     /**
      * Get the default endpoint to use if none is specified

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
 
b/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
index 0baa510..0576a10 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
@@ -16,14 +16,14 @@
  */
 package org.apache.camel.impl;
 
-import org.apache.camel.AsyncCallback;
-
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.camel.AsyncCallback;
+
 /**
- * AsyncCallback that provides a CompletableFuture when async action is done
+ * AsyncCallback that provides a CompletableFuture completed when async action 
is done
  */
-public class AsyncCallbackToCompletableFutureAdapter<T> implements 
AsyncCallback{
+public class AsyncCallbackToCompletableFutureAdapter<T> implements 
AsyncCallback {
     private final CompletableFuture<T> future;
     private volatile T result;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index 19958e4..a3dfd51 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -56,7 +56,7 @@ public class DefaultProducerTemplate extends ServiceSupport 
implements ProducerT
     private Endpoint defaultEndpoint;
     private int maximumCacheSize;
     private boolean eventNotifierEnabled = true;
-    private volatile boolean synchronous;
+    private volatile boolean threadedAsyncMode = true;
 
     public DefaultProducerTemplate(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -86,13 +86,13 @@ public class DefaultProducerTemplate extends ServiceSupport 
implements ProducerT
     }
 
     @Override
-    public boolean isSynchronous() {
-        return synchronous;
+    public boolean isThreadedAsyncMode() {
+        return threadedAsyncMode;
     }
 
     @Override
-    public void setSynchronous(boolean synchronous) {
-        this.synchronous = synchronous;
+    public void setThreadedAsyncMode(boolean useExecutor) {
+        this.threadedAsyncMode = useExecutor;
     }
 
     public int getCurrentCacheSize() {
@@ -705,7 +705,7 @@ public class DefaultProducerTemplate extends ServiceSupport 
implements ProducerT
             if (executor != null) {
                 return executor;
             }
-            if (synchronous) {
+            if (!threadedAsyncMode) {
                 executor = new SynchronousExecutorService();
             } else {
                 executor = 
camelContext.getExecutorServiceManager().newDefaultThreadPool(this, 
"ProducerTemplate");

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java 
b/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java
index 734551f..e186d9a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java
@@ -25,7 +25,7 @@ import org.apache.camel.util.StopWatch;
 /**
  * Helper class to notify on exchange sending events in async engine
  */
-class EventNotifierCallback implements AsyncCallback {
+public class EventNotifierCallback implements AsyncCallback {
     private final AsyncCallback originalCallback;
     private final StopWatch watch;
     private final Exchange exchange;
@@ -42,11 +42,8 @@ class EventNotifierCallback implements AsyncCallback {
 
     @Override
     public void done(boolean doneSync) {
-        try {
-            originalCallback.done(doneSync);
-        } finally {
-            long timeTaken = watch.stop();
-            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, 
endpoint, timeTaken);
-        }
+        long timeTaken = watch.stop();
+        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, 
endpoint, timeTaken);
+        originalCallback.done(doneSync);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index b475d29..1062029 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Cache containing created {@link Producer}.
  *
- * @version 
+ * @version
  */
 public class ProducerCache extends ServiceSupport {
     private static final Logger LOG = 
LoggerFactory.getLogger(ProducerCache.class);
@@ -277,7 +277,7 @@ public class ProducerCache extends ServiceSupport {
      *              or new one if parameter was null
      */
     public CompletableFuture<Exchange> asyncSend(Endpoint endpoint, 
ExchangePattern pattern, Processor processor, Processor resultProcessor,
-            CompletableFuture<Exchange> future) {
+                                                 CompletableFuture<Exchange> 
future) {
         return asyncSendExchange(endpoint, pattern, processor, 
resultProcessor, null, future);
     }
 
@@ -299,41 +299,37 @@ public class ProducerCache extends ServiceSupport {
      *              or new one if parameter was null
      */
     public CompletableFuture<Exchange> asyncSendExchange(final Endpoint 
endpoint, ExchangePattern pattern,
-            final Processor processor, final Processor resultProcessor, 
Exchange exchange,
-            CompletableFuture<Exchange> future) {
+                                                         final Processor 
processor, final Processor resultProcessor, Exchange exchange,
+                                                         
CompletableFuture<Exchange> future) {
         AsyncCallbackToCompletableFutureAdapter<Exchange> futureAdapter = new 
AsyncCallbackToCompletableFutureAdapter<>(future, exchange);
         doInAsyncProducer(endpoint, exchange, pattern, futureAdapter,
-                (producer, asyncProducer, innerExchange, exchangePattern, 
producerCallback) -> {
-                    if (innerExchange == null) {
-                        innerExchange = pattern != null ?
-                                producer.getEndpoint().createExchange(pattern) 
:
-                                producer.getEndpoint().createExchange();
-                        futureAdapter.setResult(innerExchange);
-                    }
+            (producer, asyncProducer, innerExchange, exchangePattern, 
producerCallback) -> {
+                if (innerExchange == null) {
+                    innerExchange = pattern != null
+                            ? producer.getEndpoint().createExchange(pattern)
+                            : producer.getEndpoint().createExchange();
+                    futureAdapter.setResult(innerExchange);
+                }
 
-                    if (processor != null) {
-                        // lets populate using the processor callback
-                        AsyncProcessor asyncProcessor = 
AsyncProcessorConverterHelper.convert(processor);
-                        try {
-                            final Exchange finalExchange = innerExchange;
-                            asyncProcessor.process(innerExchange, new 
AsyncCallback() {
-                                @Override
-                                public void done(boolean doneSync) {
-                                    asyncDispatchExchange(endpoint, producer, 
resultProcessor, finalExchange,
-                                            producerCallback);
-                                }
-                            });
-                            return false;
-                        } catch (Exception e) {
-                            // populate failed so return
-                            innerExchange.setException(e);
-                            producerCallback.done(true);
-                            return true;
-                        }
+                if (processor != null) {
+                    // lets populate using the processor callback
+                    AsyncProcessor asyncProcessor = 
AsyncProcessorConverterHelper.convert(processor);
+                    try {
+                        final Exchange finalExchange = innerExchange;
+                        asyncProcessor.process(innerExchange,
+                            doneSync -> asyncDispatchExchange(endpoint, 
producer, resultProcessor,
+                                    finalExchange, producerCallback));
+                        return false;
+                    } catch (Exception e) {
+                        // populate failed so return
+                        innerExchange.setException(e);
+                        producerCallback.done(true);
+                        return true;
                     }
+                }
 
-                    return asyncDispatchExchange(endpoint, producer, 
resultProcessor, innerExchange, producerCallback);
-                });
+                return asyncDispatchExchange(endpoint, producer, 
resultProcessor, innerExchange, producerCallback);
+            });
         return futureAdapter.getFuture();
     }
 
@@ -437,31 +433,28 @@ public class ProducerCache extends ServiceSupport {
             }
             // invoke the callback
             AsyncProcessor asyncProcessor = 
AsyncProcessorConverterHelper.convert(producer);
-            return producerCallback.doInAsyncProducer(producer, 
asyncProcessor, exchange, pattern, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    try {
-                        if (eventNotifierEnabled && watch != null) {
-                            long timeTaken = watch.stop();
-                            // emit event that the exchange was sent to the 
endpoint
-                            
EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, 
timeTaken);
-                        }
+            return producerCallback.doInAsyncProducer(producer, 
asyncProcessor, exchange, pattern, doneSync -> {
+                try {
+                    if (eventNotifierEnabled && watch != null) {
+                        long timeTaken = watch.stop();
+                        // emit event that the exchange was sent to the 
endpoint
+                        EventHelper.notifyExchangeSent(exchange.getContext(), 
exchange, endpoint, timeTaken);
+                    }
 
-                        if (producer instanceof ServicePoolAware) {
-                            // release back to the pool
-                            pool.release(endpoint, producer);
-                        } else if (!producer.isSingleton()) {
-                            // stop and shutdown non-singleton producers as we 
should not leak resources
-                            try {
-                                ServiceHelper.stopAndShutdownService(producer);
-                            } catch (Exception e) {
-                                // ignore and continue
-                                LOG.warn("Error stopping/shutting down 
producer: " + producer, e);
-                            }
+                    if (producer instanceof ServicePoolAware) {
+                        // release back to the pool
+                        pool.release(endpoint, producer);
+                    } else if (!producer.isSingleton()) {
+                        // stop and shutdown non-singleton producers as we 
should not leak resources
+                        try {
+                            ServiceHelper.stopAndShutdownService(producer);
+                        } catch (Exception e) {
+                            // ignore and continue
+                            LOG.warn("Error stopping/shutting down producer: " 
+ producer, e);
                         }
-                    } finally {
-                        callback.done(doneSync);
                     }
+                } finally {
+                    callback.done(doneSync);
                 }
             });
         } catch (Throwable e) {
@@ -475,7 +468,7 @@ public class ProducerCache extends ServiceSupport {
     }
 
     protected boolean asyncDispatchExchange(final Endpoint endpoint, Producer 
producer,
-            final Processor resultProcessor, Exchange exchange, AsyncCallback 
callback) {
+                                            final Processor resultProcessor, 
Exchange exchange, AsyncCallback callback) {
         // now lets dispatch
         LOG.debug(">>>> {} {}", endpoint, exchange);
 
@@ -659,7 +652,7 @@ public class ProducerCache extends ServiceSupport {
     public int getCapacity() {
         int capacity = -1;
         if (producers instanceof LRUCache) {
-            LRUCache<String, Producer> cache = (LRUCache<String, 
Producer>)producers;
+            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) 
producers;
             capacity = cache.getMaxCacheSize();
         }
         return capacity;
@@ -675,7 +668,7 @@ public class ProducerCache extends ServiceSupport {
     public long getHits() {
         long hits = -1;
         if (producers instanceof LRUCache) {
-            LRUCache<String, Producer> cache = (LRUCache<String, 
Producer>)producers;
+            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) 
producers;
             hits = cache.getHits();
         }
         return hits;
@@ -691,7 +684,7 @@ public class ProducerCache extends ServiceSupport {
     public long getMisses() {
         long misses = -1;
         if (producers instanceof LRUCache) {
-            LRUCache<String, Producer> cache = (LRUCache<String, 
Producer>)producers;
+            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) 
producers;
             misses = cache.getMisses();
         }
         return misses;
@@ -707,7 +700,7 @@ public class ProducerCache extends ServiceSupport {
     public long getEvicted() {
         long evicted = -1;
         if (producers instanceof LRUCache) {
-            LRUCache<String, Producer> cache = (LRUCache<String, 
Producer>)producers;
+            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) 
producers;
             evicted = cache.getEvicted();
         }
         return evicted;
@@ -718,7 +711,7 @@ public class ProducerCache extends ServiceSupport {
      */
     public void resetCacheStatistics() {
         if (producers instanceof LRUCache) {
-            LRUCache<String, Producer> cache = (LRUCache<String, 
Producer>)producers;
+            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) 
producers;
             cache.resetStatistics();
         }
         if (statistics != null) {
@@ -742,7 +735,7 @@ public class ProducerCache extends ServiceSupport {
      */
     public void cleanUp() {
         if (producers instanceof LRUCache) {
-            LRUCache<String, Producer> cache = (LRUCache<String, 
Producer>)producers;
+            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) 
producers;
             cache.cleanUp();
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
index f218a6e..17d32a1 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
@@ -16,21 +16,21 @@
  */
 package org.apache.camel.impl;
 
-import org.apache.camel.Exchange;
-import org.junit.Assert;
-
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.camel.Exchange;
+import org.junit.Assert;
+
 /**
  * @version
  */
-public class DefaultProducerTemplateNonBlockingAsyncTest extends 
DefaultProducerTemplateAsyncTest{
+public class DefaultProducerTemplateNonBlockingAsyncTest extends 
DefaultProducerTemplateAsyncTest {
     @Override
     protected void setUp() throws Exception {
         super.setUp();
         template.stop();
-        template.setSynchronous(true);
+        template.setThreadedAsyncMode(false);
         template.start();
     }
 

Reply via email to