Repository: camel Updated Branches: refs/heads/master 5104c1669 -> 1c8eda402
CAMEL-10308 Provide a way to use async engine from ProducerTemplate style fixes, optiona rename & don't include out of camel callbacks into processing time Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c8eda40 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c8eda40 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c8eda40 Branch: refs/heads/master Commit: 1c8eda4020531e1e2ecf560fb1b91f1ddd435dd2 Parents: 51a8068 Author: Vitalii Tymchyshyn <v...@tym.im> Authored: Fri Sep 16 20:39:27 2016 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Sep 17 09:34:52 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/ProducerTemplate.java | 23 ++-- ...AsyncCallbackToCompletableFutureAdapter.java | 8 +- .../camel/impl/DefaultProducerTemplate.java | 12 +- .../camel/impl/EventNotifierCallback.java | 11 +- .../org/apache/camel/impl/ProducerCache.java | 115 +++++++++---------- ...ultProducerTemplateNonBlockingAsyncTest.java | 10 +- 6 files changed, 85 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java index 6461e4e..8858e74 100644 --- a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java @@ -16,8 +16,6 @@ */ package org.apache.camel; -import org.apache.camel.spi.Synchronization; - import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -25,6 +23,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.camel.spi.Synchronization; + /** * Template for working with Camel and sending {@link Message} instances in an * {@link Exchange} to an {@link Endpoint}. @@ -94,19 +94,20 @@ public interface ProducerTemplate extends Service { int getCurrentCacheSize(); /** - * Reports if async* methods will dispath processing from the calling thread (true) or through executor (false). - * They will still employ asynchronous engine, so this mode can be useful for high-speed non-blocking processing. - * @return if async* methods will run in the calling thread + * Reports if async* methods will dispath processing from the calling thread (false) or through executor (true). + * In both cases asynchronous engine will be used, so this non-threaded can be useful for high-speed + * non-blocking processing. + * @return if async* methods will dipatch processing with the executor */ - boolean isSynchronous(); + boolean isThreadedAsyncMode(); /** - * Reports if async* methods will dispath processing from the calling thread (true) or through executor (false). - * In any case they would still employ asynchronous engine, so setting to true can be useful - * for high-speed non-blocking processing. - * @param synchronous if async* methods will run in the calling thread + * Reports if async* methods will dispath processing from the calling thread (false) or through executor (true). + * In both cases asynchronous engine will be used, so this non-threaded can be useful for high-speed + * non-blocking processing. + * @param useExecutor if async* methods will dipatch processing with the executor */ - void setSynchronous(boolean synchronous); + void setThreadedAsyncMode(boolean useExecutor); /** * Get the default endpoint to use if none is specified http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java b/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java index 0baa510..0576a10 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java +++ b/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java @@ -16,14 +16,14 @@ */ package org.apache.camel.impl; -import org.apache.camel.AsyncCallback; - import java.util.concurrent.CompletableFuture; +import org.apache.camel.AsyncCallback; + /** - * AsyncCallback that provides a CompletableFuture when async action is done + * AsyncCallback that provides a CompletableFuture completed when async action is done */ -public class AsyncCallbackToCompletableFutureAdapter<T> implements AsyncCallback{ +public class AsyncCallbackToCompletableFutureAdapter<T> implements AsyncCallback { private final CompletableFuture<T> future; private volatile T result; http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java index 19958e4..a3dfd51 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java @@ -56,7 +56,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT private Endpoint defaultEndpoint; private int maximumCacheSize; private boolean eventNotifierEnabled = true; - private volatile boolean synchronous; + private volatile boolean threadedAsyncMode = true; public DefaultProducerTemplate(CamelContext camelContext) { this.camelContext = camelContext; @@ -86,13 +86,13 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT } @Override - public boolean isSynchronous() { - return synchronous; + public boolean isThreadedAsyncMode() { + return threadedAsyncMode; } @Override - public void setSynchronous(boolean synchronous) { - this.synchronous = synchronous; + public void setThreadedAsyncMode(boolean useExecutor) { + this.threadedAsyncMode = useExecutor; } public int getCurrentCacheSize() { @@ -705,7 +705,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT if (executor != null) { return executor; } - if (synchronous) { + if (!threadedAsyncMode) { executor = new SynchronousExecutorService(); } else { executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate"); http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java b/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java index 734551f..e186d9a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java @@ -25,7 +25,7 @@ import org.apache.camel.util.StopWatch; /** * Helper class to notify on exchange sending events in async engine */ -class EventNotifierCallback implements AsyncCallback { +public class EventNotifierCallback implements AsyncCallback { private final AsyncCallback originalCallback; private final StopWatch watch; private final Exchange exchange; @@ -42,11 +42,8 @@ class EventNotifierCallback implements AsyncCallback { @Override public void done(boolean doneSync) { - try { - originalCallback.done(doneSync); - } finally { - long timeTaken = watch.stop(); - EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); - } + long timeTaken = watch.stop(); + EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); + originalCallback.done(doneSync); } } http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index b475d29..1062029 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; /** * Cache containing created {@link Producer}. * - * @version + * @version */ public class ProducerCache extends ServiceSupport { private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class); @@ -277,7 +277,7 @@ public class ProducerCache extends ServiceSupport { * or new one if parameter was null */ public CompletableFuture<Exchange> asyncSend(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor, - CompletableFuture<Exchange> future) { + CompletableFuture<Exchange> future) { return asyncSendExchange(endpoint, pattern, processor, resultProcessor, null, future); } @@ -299,41 +299,37 @@ public class ProducerCache extends ServiceSupport { * or new one if parameter was null */ public CompletableFuture<Exchange> asyncSendExchange(final Endpoint endpoint, ExchangePattern pattern, - final Processor processor, final Processor resultProcessor, Exchange exchange, - CompletableFuture<Exchange> future) { + final Processor processor, final Processor resultProcessor, Exchange exchange, + CompletableFuture<Exchange> future) { AsyncCallbackToCompletableFutureAdapter<Exchange> futureAdapter = new AsyncCallbackToCompletableFutureAdapter<>(future, exchange); doInAsyncProducer(endpoint, exchange, pattern, futureAdapter, - (producer, asyncProducer, innerExchange, exchangePattern, producerCallback) -> { - if (innerExchange == null) { - innerExchange = pattern != null ? - producer.getEndpoint().createExchange(pattern) : - producer.getEndpoint().createExchange(); - futureAdapter.setResult(innerExchange); - } + (producer, asyncProducer, innerExchange, exchangePattern, producerCallback) -> { + if (innerExchange == null) { + innerExchange = pattern != null + ? producer.getEndpoint().createExchange(pattern) + : producer.getEndpoint().createExchange(); + futureAdapter.setResult(innerExchange); + } - if (processor != null) { - // lets populate using the processor callback - AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(processor); - try { - final Exchange finalExchange = innerExchange; - asyncProcessor.process(innerExchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - asyncDispatchExchange(endpoint, producer, resultProcessor, finalExchange, - producerCallback); - } - }); - return false; - } catch (Exception e) { - // populate failed so return - innerExchange.setException(e); - producerCallback.done(true); - return true; - } + if (processor != null) { + // lets populate using the processor callback + AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(processor); + try { + final Exchange finalExchange = innerExchange; + asyncProcessor.process(innerExchange, + doneSync -> asyncDispatchExchange(endpoint, producer, resultProcessor, + finalExchange, producerCallback)); + return false; + } catch (Exception e) { + // populate failed so return + innerExchange.setException(e); + producerCallback.done(true); + return true; } + } - return asyncDispatchExchange(endpoint, producer, resultProcessor, innerExchange, producerCallback); - }); + return asyncDispatchExchange(endpoint, producer, resultProcessor, innerExchange, producerCallback); + }); return futureAdapter.getFuture(); } @@ -437,31 +433,28 @@ public class ProducerCache extends ServiceSupport { } // invoke the callback AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer); - return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - try { - if (eventNotifierEnabled && watch != null) { - long timeTaken = watch.stop(); - // emit event that the exchange was sent to the endpoint - EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); - } + return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> { + try { + if (eventNotifierEnabled && watch != null) { + long timeTaken = watch.stop(); + // emit event that the exchange was sent to the endpoint + EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); + } - if (producer instanceof ServicePoolAware) { - // release back to the pool - pool.release(endpoint, producer); - } else if (!producer.isSingleton()) { - // stop and shutdown non-singleton producers as we should not leak resources - try { - ServiceHelper.stopAndShutdownService(producer); - } catch (Exception e) { - // ignore and continue - LOG.warn("Error stopping/shutting down producer: " + producer, e); - } + if (producer instanceof ServicePoolAware) { + // release back to the pool + pool.release(endpoint, producer); + } else if (!producer.isSingleton()) { + // stop and shutdown non-singleton producers as we should not leak resources + try { + ServiceHelper.stopAndShutdownService(producer); + } catch (Exception e) { + // ignore and continue + LOG.warn("Error stopping/shutting down producer: " + producer, e); } - } finally { - callback.done(doneSync); } + } finally { + callback.done(doneSync); } }); } catch (Throwable e) { @@ -475,7 +468,7 @@ public class ProducerCache extends ServiceSupport { } protected boolean asyncDispatchExchange(final Endpoint endpoint, Producer producer, - final Processor resultProcessor, Exchange exchange, AsyncCallback callback) { + final Processor resultProcessor, Exchange exchange, AsyncCallback callback) { // now lets dispatch LOG.debug(">>>> {} {}", endpoint, exchange); @@ -659,7 +652,7 @@ public class ProducerCache extends ServiceSupport { public int getCapacity() { int capacity = -1; if (producers instanceof LRUCache) { - LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; + LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; capacity = cache.getMaxCacheSize(); } return capacity; @@ -675,7 +668,7 @@ public class ProducerCache extends ServiceSupport { public long getHits() { long hits = -1; if (producers instanceof LRUCache) { - LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; + LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; hits = cache.getHits(); } return hits; @@ -691,7 +684,7 @@ public class ProducerCache extends ServiceSupport { public long getMisses() { long misses = -1; if (producers instanceof LRUCache) { - LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; + LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; misses = cache.getMisses(); } return misses; @@ -707,7 +700,7 @@ public class ProducerCache extends ServiceSupport { public long getEvicted() { long evicted = -1; if (producers instanceof LRUCache) { - LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; + LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; evicted = cache.getEvicted(); } return evicted; @@ -718,7 +711,7 @@ public class ProducerCache extends ServiceSupport { */ public void resetCacheStatistics() { if (producers instanceof LRUCache) { - LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; + LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; cache.resetStatistics(); } if (statistics != null) { @@ -742,7 +735,7 @@ public class ProducerCache extends ServiceSupport { */ public void cleanUp() { if (producers instanceof LRUCache) { - LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; + LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; cache.cleanUp(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/1c8eda40/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java index f218a6e..17d32a1 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java @@ -16,21 +16,21 @@ */ package org.apache.camel.impl; -import org.apache.camel.Exchange; -import org.junit.Assert; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import org.apache.camel.Exchange; +import org.junit.Assert; + /** * @version */ -public class DefaultProducerTemplateNonBlockingAsyncTest extends DefaultProducerTemplateAsyncTest{ +public class DefaultProducerTemplateNonBlockingAsyncTest extends DefaultProducerTemplateAsyncTest { @Override protected void setUp() throws Exception { super.setUp(); template.stop(); - template.setSynchronous(true); + template.setThreadedAsyncMode(false); template.start(); }