This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ac1d2955bdecabb9ed29f77eda4cd9deb2e06e54
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Jun 12 14:26:37 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different 
reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     | 32 +++++++++++---
 .../camel/impl/engine/AbstractCamelContext.java    |  7 +++-
 .../camel/impl/engine/DefaultReactiveExecutor.java | 49 ++++++++++++----------
 .../camel/processor/CamelInternalProcessor.java    |  8 ++--
 4 files changed, 62 insertions(+), 34 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 4a21127..37744fb 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -26,20 +26,40 @@ public interface ReactiveExecutor {
     // TODO: Add javadoc
     // TODO: Better name
 
-    void scheduleMain(Runnable runnable);
+    default void schedule(Runnable runnable) {
+        schedule(runnable, null);
+    }
 
-    void scheduleSync(Runnable runnable);
+    void schedule(Runnable runnable, String description);
 
-    void scheduleMain(Runnable runnable, String description);
+    default void scheduleMain(Runnable runnable) {
+        scheduleMain(runnable, null);
+    }
 
-    void schedule(Runnable runnable);
+    void scheduleMain(Runnable runnable, String description);
 
-    void schedule(Runnable runnable, String description);
+    default void scheduleSync(Runnable runnable) {
+        scheduleSync(runnable, null);
+    }
 
     void scheduleSync(Runnable runnable, String description);
 
+    // TODO: Can we make this so we dont need an method on this interface as 
its only used once
     boolean executeFromQueue();
 
-    void callback(AsyncCallback callback);
+    default void callback(AsyncCallback callback) {
+        schedule(new Runnable() {
+
+            @Override
+            public void run() {
+                callback.done(false);
+            }
+
+            @Override
+            public String toString() {
+                return "Callback[" + callback + "]";
+            }
+        });
+    }
 
 }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 89b1806..20eadae 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2587,8 +2587,9 @@ public abstract class AbstractCamelContext extends 
ServiceSupport implements Ext
             shutdownServices(notifier);
         }
 
-        // shutdown executor service and management as the last one
+        // shutdown executor service, reactive executor and management as the 
last one
         shutdownServices(executorServiceManager);
+        shutdownServices(reactiveExecutor);
         shutdownServices(managementStrategy);
         shutdownServices(managementMBeanAssembler);
         shutdownServices(lifecycleStrategies);
@@ -3806,7 +3807,9 @@ public abstract class AbstractCamelContext extends 
ServiceSupport implements Ext
     }
 
     public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) {
-        this.reactiveExecutor = reactiveExecutor;
+        // special for executorServiceManager as want to stop it manually so
+        // false in stopOnShutdown
+        this.reactiveExecutor = doAddService(reactiveExecutor, false);
     }
 
     @Override
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 3ce7f0a..e094999 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -19,16 +19,17 @@ package org.apache.camel.impl.engine;
 import java.util.LinkedList;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ReactiveExecutor;
+import org.apache.camel.support.service.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link ReactiveExecutor}.
  */
-public class DefaultReactiveExecutor implements ReactiveExecutor {
+public class DefaultReactiveExecutor extends ServiceSupport implements 
ReactiveExecutor, StaticService {
 
-    // TODO: StaticServiceSupport so we can init/start/stop
     // TODO: Add mbean info so we can get details
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultReactiveExecutor.class);
@@ -36,33 +37,27 @@ public class DefaultReactiveExecutor implements 
ReactiveExecutor {
     private final ThreadLocal<Worker> workers = 
ThreadLocal.withInitial(Worker::new);
 
     @Override
-    public void scheduleMain(Runnable runnable) {
-        workers.get().schedule(runnable, true, true, false);
-    }
-
-    @Override
-    public void scheduleSync(Runnable runnable) {
-        workers.get().schedule(runnable, true, true, true);
-    }
-
-    @Override
     public void scheduleMain(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), true, true, 
false);
-    }
-
-    @Override
-    public void schedule(Runnable runnable) {
-        workers.get().schedule(runnable, true, false, false);;
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, true, true, false);
     }
 
     @Override
     public void schedule(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), true, false, 
false);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, true, false, false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable, String description) {
-        workers.get().schedule(describe(runnable, description), false, true, 
true);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        workers.get().schedule(runnable, false, true, true);
     }
 
     @Override
@@ -97,13 +92,23 @@ public class DefaultReactiveExecutor implements 
ReactiveExecutor {
         };
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
     private static class Worker {
 
         private volatile LinkedList<Runnable> queue = new LinkedList<>();
         private volatile LinkedList<LinkedList<Runnable>> back;
         private volatile boolean running;
 
-        public void schedule(Runnable runnable, boolean first, boolean main, 
boolean sync) {
+        void schedule(Runnable runnable, boolean first, boolean main, boolean 
sync) {
             if (main) {
                 if (!queue.isEmpty()) {
                     if (back == null) {
@@ -149,7 +154,7 @@ public class DefaultReactiveExecutor implements 
ReactiveExecutor {
             }
         }
 
-        public boolean executeFromQueue() {
+        boolean executeFromQueue() {
             final Runnable polled = queue != null ? queue.poll() : null;
             if (polled == null) {
                 return false;
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index f039f27..3651856 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -120,7 +120,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
     }
 
     @Override
-    public boolean process(Exchange exchange, AsyncCallback ocallback) {
+    public boolean process(Exchange exchange, AsyncCallback originalCallback) {
         // ----------------------------------------------------------
         // CAMEL END USER - READ ME FOR DEBUGGING TIPS
         // ----------------------------------------------------------
@@ -137,7 +137,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
 
         if (processor == null || !continueProcessing(exchange)) {
             // no processor or we should not continue then we are done
-            ocallback.done(true);
+            originalCallback.done(true);
             return true;
         }
 
@@ -151,7 +151,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
                 states[i] = state;
             } catch (Throwable e) {
                 exchange.setException(e);
-                ocallback.done(true);
+                originalCallback.done(true);
                 return true;
             }
         }
@@ -174,7 +174,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                
exchange.getContext().getReactiveExecutor().callback(ocallback);
+                
exchange.getContext().getReactiveExecutor().callback(originalCallback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------

Reply via email to