CAMEL-9899: camel-rx - Use a worker pool for tasks such as stopping consumers


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0df2ba7c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0df2ba7c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0df2ba7c

Branch: refs/heads/master
Commit: 0df2ba7ce7e3f99882e6d3f17cf31f025e9b79ea
Parents: 8962c4d
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Apr 21 22:47:10 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Apr 21 22:48:50 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/ReactiveCamel.java | 27 ++++++++++++++++++--
 .../camel/rx/support/EndpointSubscribeFunc.java |  8 ++++--
 .../camel/rx/support/EndpointSubscription.java  | 21 ++++++++++-----
 3 files changed, 46 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0df2ba7c/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java 
b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index b0877f8..7448afd 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.rx;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -34,9 +36,30 @@ import rx.functions.Func1;
  */
 public class ReactiveCamel {
     private final CamelContext camelContext;
+    // a worker pool for running tasks such as stopping consumers which should 
not use the event loop
+    // thread from rx-java but use our own thread to process such tasks
+    private final ExecutorService workerPool;
 
+    /**
+     * Wrap the CamelContext as reactive.
+     * <p/>
+     * Uses a default value of 10 as maximum number of threads in the worker 
pool used for reactive background tasks.
+     *
+     * @param camelContext  the CamelContext
+     */
     public ReactiveCamel(CamelContext camelContext) {
+        this(camelContext, 10);
+    }
+
+    /**
+     * Wrap the CamelContext as reactive.
+     *
+     * @param camelContext  the CamelContext
+     * @param maxWorkerPoolSize  maximum number of threads in the worker pool 
used for reactive background tasks
+     */
+    public ReactiveCamel(CamelContext camelContext, int maxWorkerPoolSize) {
         this.camelContext = camelContext;
+        this.workerPool = 
camelContext.getExecutorServiceManager().newThreadPool(this, 
"ReactiveCamelWorker", 0, maxWorkerPoolSize);
     }
 
     public CamelContext getCamelContext() {
@@ -134,7 +157,7 @@ public class ReactiveCamel {
      */
     private <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
                                                        final Func1<Exchange, 
T> converter) {
-        Observable.OnSubscribe<T> func = new 
EndpointSubscribeFunc<T>(endpoint, converter);
+        Observable.OnSubscribe<T> func = new 
EndpointSubscribeFunc<T>(workerPool, endpoint, converter);
         return new EndpointObservable<T>(endpoint, func);
     }
 
@@ -142,6 +165,6 @@ public class ReactiveCamel {
      * Return a newly created {@link Observable} without conversion
      */
     private Observable<Exchange> createEndpointObservable(final Endpoint 
endpoint) {
-        return new EndpointObservable<Exchange>(endpoint, new 
EndpointSubscribeFunc<Exchange>(endpoint, exchange -> exchange));
+        return new EndpointObservable<Exchange>(endpoint, new 
EndpointSubscribeFunc<Exchange>(workerPool, endpoint, exchange -> exchange));
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0df2ba7c/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
index df79083..2f5f6dc 100644
--- 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
+++ 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.rx.support;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import rx.Observable;
@@ -24,16 +26,18 @@ import rx.functions.Func1;
 
 public class EndpointSubscribeFunc<T> implements Observable.OnSubscribe<T> {
 
+    private final ExecutorService workerPool;
     private final Endpoint endpoint;
     private final Func1<Exchange, T> converter;
 
-    public EndpointSubscribeFunc(Endpoint endpoint, Func1<Exchange, T> 
converter) {
+    public EndpointSubscribeFunc(ExecutorService workerPool, Endpoint 
endpoint, Func1<Exchange, T> converter) {
+        this.workerPool = workerPool;
         this.endpoint = endpoint;
         this.converter = converter;
     }
 
     @Override
     public void call(Subscriber<? super T> subscriber) {
-        subscriber.add(new EndpointSubscription<T>(endpoint, subscriber, 
converter));
+        subscriber.add(new EndpointSubscription<T>(workerPool, endpoint, 
subscriber, converter));
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0df2ba7c/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 0b2b6df..e8e6d38 100644
--- 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.rx.support;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.Consumer;
@@ -38,13 +39,15 @@ public class EndpointSubscription<T> extends ServiceSupport 
implements Subscript
 
     private static final Logger LOG = 
LoggerFactory.getLogger(EndpointSubscription.class);
 
+    private final ExecutorService workerPool;
     private final Endpoint endpoint;
     private final Observer<? super T> observer;
     private Consumer consumer;
     private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
 
-    public EndpointSubscription(Endpoint endpoint, final Observer<? super T> 
observer,
+    public EndpointSubscription(ExecutorService workerPool, Endpoint endpoint, 
final Observer<? super T> observer,
                                 final Func1<Exchange, T> func) {
+        this.workerPool = workerPool;
         this.endpoint = endpoint;
         this.observer = observer;
 
@@ -73,11 +76,17 @@ public class EndpointSubscription<T> extends ServiceSupport 
implements Subscript
     public void unsubscribe() {
         if (unsubscribed.compareAndSet(false, true)) {
             if (consumer != null) {
-                try {
-                    ServiceHelper.stopServices(consumer);
-                } catch (Exception e) {
-                    LOG.warn("Error stopping consumer: " + consumer + " due " 
+ e.getMessage() + ". This exception is ignored.", e);
-                }
+                // must stop the consumer from the worker pool as we should 
not stop ourself from a thread from ourself
+                workerPool.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            ServiceHelper.stopServices(consumer);
+                        } catch (Exception e) {
+                            LOG.warn("Error stopping consumer: " + consumer + 
" due " + e.getMessage() + ". This exception is ignored.", e);
+                        }
+                    }
+                });
             }
         }
     }

Reply via email to