CAMEL-9899: camel-rx - Use a worker pool for tasks such as stopping consumers
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e8bff3a0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e8bff3a0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e8bff3a0 Branch: refs/heads/camel-2.17.x Commit: e8bff3a0aef10e3d98cc100bfd065d6a81ca9acf Parents: 4c97a2c Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Apr 21 22:47:10 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Apr 21 22:50:53 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/rx/ReactiveCamel.java | 27 ++++++++++++++++++-- .../camel/rx/support/EndpointSubscribeFunc.java | 8 ++++-- .../camel/rx/support/EndpointSubscription.java | 21 ++++++++++----- 3 files changed, 46 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java index d1365bb..1cbe90e 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java @@ -16,6 +16,8 @@ */ package org.apache.camel.rx; +import java.util.concurrent.ExecutorService; + import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -34,9 +36,30 @@ import rx.functions.Func1; */ public class ReactiveCamel { private final CamelContext camelContext; + // a worker pool for running tasks such as stopping consumers which should not use the event loop + // thread from rx-java but use our own thread to process such tasks + private final ExecutorService workerPool; + /** + * Wrap the CamelContext as reactive. + * <p/> + * Uses a default value of 10 as maximum number of threads in the worker pool used for reactive background tasks. + * + * @param camelContext the CamelContext + */ public ReactiveCamel(CamelContext camelContext) { + this(camelContext, 10); + } + + /** + * Wrap the CamelContext as reactive. + * + * @param camelContext the CamelContext + * @param maxWorkerPoolSize maximum number of threads in the worker pool used for reactive background tasks + */ + public ReactiveCamel(CamelContext camelContext, int maxWorkerPoolSize) { this.camelContext = camelContext; + this.workerPool = camelContext.getExecutorServiceManager().newThreadPool(this, "ReactiveCamelWorker", 0, maxWorkerPoolSize); } /** @@ -133,7 +156,7 @@ public class ReactiveCamel { */ protected <T> Observable<T> createEndpointObservable(final Endpoint endpoint, final Func1<Exchange, T> converter) { - Observable.OnSubscribe<T> func = new EndpointSubscribeFunc<T>(endpoint, converter); + Observable.OnSubscribe<T> func = new EndpointSubscribeFunc<T>(workerPool, endpoint, converter); return new EndpointObservable<T>(endpoint, func); } @@ -141,7 +164,7 @@ public class ReactiveCamel { * Return a newly created {@link Observable} without conversion */ protected Observable<Exchange> createEndpointObservable(final Endpoint endpoint) { - return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<Exchange>(endpoint, new Func1<Exchange, Exchange>() { + return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<Exchange>(workerPool, endpoint, new Func1<Exchange, Exchange>() { @Override public Exchange call(Exchange exchange) { return exchange; http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java index df79083..2f5f6dc 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java @@ -16,6 +16,8 @@ */ package org.apache.camel.rx.support; +import java.util.concurrent.ExecutorService; + import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import rx.Observable; @@ -24,16 +26,18 @@ import rx.functions.Func1; public class EndpointSubscribeFunc<T> implements Observable.OnSubscribe<T> { + private final ExecutorService workerPool; private final Endpoint endpoint; private final Func1<Exchange, T> converter; - public EndpointSubscribeFunc(Endpoint endpoint, Func1<Exchange, T> converter) { + public EndpointSubscribeFunc(ExecutorService workerPool, Endpoint endpoint, Func1<Exchange, T> converter) { + this.workerPool = workerPool; this.endpoint = endpoint; this.converter = converter; } @Override public void call(Subscriber<? super T> subscriber) { - subscriber.add(new EndpointSubscription<T>(endpoint, subscriber, converter)); + subscriber.add(new EndpointSubscription<T>(workerPool, endpoint, subscriber, converter)); } } http://git-wip-us.apache.org/repos/asf/camel/blob/e8bff3a0/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java index 593e1d4..3c74d81 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java @@ -16,6 +16,7 @@ */ package org.apache.camel.rx.support; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.Consumer; @@ -37,13 +38,15 @@ public class EndpointSubscription<T> implements Subscription { private static final Logger LOG = LoggerFactory.getLogger(EndpointSubscription.class); + private final ExecutorService workerPool; private final Endpoint endpoint; private final Observer<? super T> observer; private Consumer consumer; private final AtomicBoolean unsubscribed = new AtomicBoolean(false); - public EndpointSubscription(Endpoint endpoint, final Observer<? super T> observer, + public EndpointSubscription(ExecutorService workerPool, Endpoint endpoint, final Observer<? super T> observer, final Func1<Exchange, T> func) { + this.workerPool = workerPool; this.endpoint = endpoint; this.observer = observer; @@ -69,11 +72,17 @@ public class EndpointSubscription<T> implements Subscription { public void unsubscribe() { if (unsubscribed.compareAndSet(false, true)) { if (consumer != null) { - try { - ServiceHelper.stopServices(consumer); - } catch (Exception e) { - LOG.warn("Error stopping consumer: " + consumer + " due " + e.getMessage() + ". This exception is ignored.", e); - } + // must stop the consumer from the worker pool as we should not stop ourself from a thread from ourself + workerPool.submit(new Runnable() { + @Override + public void run() { + try { + ServiceHelper.stopServices(consumer); + } catch (Exception e) { + LOG.warn("Error stopping consumer: " + consumer + " due " + e.getMessage() + ". This exception is ignored.", e); + } + } + }); } } }