This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-13636 in repository https://gitbox.apache.org/repos/asf/camel.git
commit ac1d2955bdecabb9ed29f77eda4cd9deb2e06e54 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jun 12 14:26:37 2019 +0200 CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines --- .../org/apache/camel/spi/ReactiveExecutor.java | 32 +++++++++++--- .../camel/impl/engine/AbstractCamelContext.java | 7 +++- .../camel/impl/engine/DefaultReactiveExecutor.java | 49 ++++++++++++---------- .../camel/processor/CamelInternalProcessor.java | 8 ++-- 4 files changed, 62 insertions(+), 34 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java index 4a21127..37744fb 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java @@ -26,20 +26,40 @@ public interface ReactiveExecutor { // TODO: Add javadoc // TODO: Better name - void scheduleMain(Runnable runnable); + default void schedule(Runnable runnable) { + schedule(runnable, null); + } - void scheduleSync(Runnable runnable); + void schedule(Runnable runnable, String description); - void scheduleMain(Runnable runnable, String description); + default void scheduleMain(Runnable runnable) { + scheduleMain(runnable, null); + } - void schedule(Runnable runnable); + void scheduleMain(Runnable runnable, String description); - void schedule(Runnable runnable, String description); + default void scheduleSync(Runnable runnable) { + scheduleSync(runnable, null); + } void scheduleSync(Runnable runnable, String description); + // TODO: Can we make this so we dont need an method on this interface as its only used once boolean executeFromQueue(); - void callback(AsyncCallback callback); + default void callback(AsyncCallback callback) { + schedule(new Runnable() { + + @Override + public void run() { + callback.done(false); + } + + @Override + public String toString() { + return "Callback[" + callback + "]"; + } + }); + } } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 89b1806..20eadae 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -2587,8 +2587,9 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext shutdownServices(notifier); } - // shutdown executor service and management as the last one + // shutdown executor service, reactive executor and management as the last one shutdownServices(executorServiceManager); + shutdownServices(reactiveExecutor); shutdownServices(managementStrategy); shutdownServices(managementMBeanAssembler); shutdownServices(lifecycleStrategies); @@ -3806,7 +3807,9 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext } public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) { - this.reactiveExecutor = reactiveExecutor; + // special for executorServiceManager as want to stop it manually so + // false in stopOnShutdown + this.reactiveExecutor = doAddService(reactiveExecutor, false); } @Override diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index 3ce7f0a..e094999 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -19,16 +19,17 @@ package org.apache.camel.impl.engine; import java.util.LinkedList; import org.apache.camel.AsyncCallback; +import org.apache.camel.StaticService; import org.apache.camel.spi.ReactiveExecutor; +import org.apache.camel.support.service.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Default {@link ReactiveExecutor}. */ -public class DefaultReactiveExecutor implements ReactiveExecutor { +public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService { - // TODO: StaticServiceSupport so we can init/start/stop // TODO: Add mbean info so we can get details private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class); @@ -36,33 +37,27 @@ public class DefaultReactiveExecutor implements ReactiveExecutor { private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(Worker::new); @Override - public void scheduleMain(Runnable runnable) { - workers.get().schedule(runnable, true, true, false); - } - - @Override - public void scheduleSync(Runnable runnable) { - workers.get().schedule(runnable, true, true, true); - } - - @Override public void scheduleMain(Runnable runnable, String description) { - workers.get().schedule(describe(runnable, description), true, true, false); - } - - @Override - public void schedule(Runnable runnable) { - workers.get().schedule(runnable, true, false, false);; + if (description != null) { + runnable = describe(runnable, description); + } + workers.get().schedule(runnable, true, true, false); } @Override public void schedule(Runnable runnable, String description) { - workers.get().schedule(describe(runnable, description), true, false, false); + if (description != null) { + runnable = describe(runnable, description); + } + workers.get().schedule(runnable, true, false, false); } @Override public void scheduleSync(Runnable runnable, String description) { - workers.get().schedule(describe(runnable, description), false, true, true); + if (description != null) { + runnable = describe(runnable, description); + } + workers.get().schedule(runnable, false, true, true); } @Override @@ -97,13 +92,23 @@ public class DefaultReactiveExecutor implements ReactiveExecutor { }; } + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + // noop + } + private static class Worker { private volatile LinkedList<Runnable> queue = new LinkedList<>(); private volatile LinkedList<LinkedList<Runnable>> back; private volatile boolean running; - public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) { + void schedule(Runnable runnable, boolean first, boolean main, boolean sync) { if (main) { if (!queue.isEmpty()) { if (back == null) { @@ -149,7 +154,7 @@ public class DefaultReactiveExecutor implements ReactiveExecutor { } } - public boolean executeFromQueue() { + boolean executeFromQueue() { final Runnable polled = queue != null ? queue.poll() : null; if (polled == null) { return false; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index f039f27..3651856 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -120,7 +120,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } @Override - public boolean process(Exchange exchange, AsyncCallback ocallback) { + public boolean process(Exchange exchange, AsyncCallback originalCallback) { // ---------------------------------------------------------- // CAMEL END USER - READ ME FOR DEBUGGING TIPS // ---------------------------------------------------------- @@ -137,7 +137,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { if (processor == null || !continueProcessing(exchange)) { // no processor or we should not continue then we are done - ocallback.done(true); + originalCallback.done(true); return true; } @@ -151,7 +151,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { states[i] = state; } catch (Throwable e) { exchange.setException(e); - ocallback.done(true); + originalCallback.done(true); return true; } } @@ -174,7 +174,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // CAMEL END USER - DEBUG ME HERE +++ START +++ // ---------------------------------------------------------- // callback must be called - exchange.getContext().getReactiveExecutor().callback(ocallback); + exchange.getContext().getReactiveExecutor().callback(originalCallback); // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ----------------------------------------------------------