This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-13636 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 428c2f558a70c85873226095af4ff99770b23237 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jun 12 13:01:12 2019 +0200 camel3 - SPI for ReactiveHelper so we can plugin different reactive engines --- .../org/apache/camel/spi/ReactiveExecutor.java | 4 ++++ .../engine/DefaultAsyncProcessorAwaitManager.java | 3 +-- .../camel/impl/engine/DefaultReactiveExecutor.java | 6 +++++ .../camel/processor/CamelInternalProcessor.java | 5 ++-- .../org/apache/camel/processor/LoopProcessor.java | 7 +++--- .../apache/camel/processor/MulticastProcessor.java | 13 +++++------ .../java/org/apache/camel/processor/Pipeline.java | 9 ++++---- .../processor/SharedCamelInternalProcessor.java | 5 ++-- .../org/apache/camel/processor/TryProcessor.java | 5 ++-- .../processor/aggregate/AggregateProcessor.java | 3 +-- .../errorhandler/RedeliveryErrorHandler.java | 27 +++++++++++----------- .../loadbalancer/FailOverLoadBalancer.java | 5 ++-- .../processor/loadbalancer/TopicLoadBalancer.java | 5 ++-- 13 files changed, 48 insertions(+), 49 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java index 8987bd3..4a21127 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java @@ -16,6 +16,8 @@ */ package org.apache.camel.spi; +import org.apache.camel.AsyncCallback; + /** * SPI to plugin different reactive engines in the Camel routing engine. */ @@ -38,4 +40,6 @@ public interface ReactiveExecutor { boolean executeFromQueue(); + void callback(AsyncCallback callback); + } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java index 3a2b41c..8087942 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java @@ -34,7 +34,6 @@ import org.apache.camel.StaticService; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.ExchangeFormatter; import org.apache.camel.support.MessageHelper; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.support.processor.DefaultExchangeFormatter; import org.apache.camel.support.service.ServiceSupport; @@ -88,7 +87,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements if (latch.getCount() <= 0) { return; } - } while (ReactiveHelper.executeFromQueue()); + } while (exchange.getContext().getReactiveExecutor().executeFromQueue()); log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); try { diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index 6a9473b..d700e3c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -16,6 +16,7 @@ */ package org.apache.camel.impl.engine; +import org.apache.camel.AsyncCallback; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.support.ReactiveHelper; @@ -58,4 +59,9 @@ public class DefaultReactiveExecutor implements ReactiveExecutor { public boolean executeFromQueue() { return ReactiveHelper.executeFromQueue(); } + + @Override + public void callback(AsyncCallback callback) { + ReactiveHelper.callback(callback); + } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 27faa0b..f039f27 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -48,7 +48,6 @@ import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.MessageHelper; import org.apache.camel.support.OrderedComparator; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; import org.apache.camel.util.StopWatch; @@ -175,7 +174,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // CAMEL END USER - DEBUG ME HERE +++ START +++ // ---------------------------------------------------------- // callback must be called - ReactiveHelper.callback(ocallback); + exchange.getContext().getReactiveExecutor().callback(ocallback); // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- @@ -225,7 +224,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- - ReactiveHelper.schedule(() -> { + exchange.getContext().getReactiveExecutor().schedule(() -> { // execute any after processor work (in current thread, not in the callback) if (uow != null) { uow.afterProcess(processor, exchange, callback, false); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java index 75329bc..1628c85 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -25,7 +25,6 @@ import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ExchangeHelper; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; import static org.apache.camel.processor.PipelineHelper.continueProcessing; @@ -54,9 +53,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, LoopState state = new LoopState(exchange, callback); if (exchange.isTransacted()) { - ReactiveHelper.scheduleSync(state); + exchange.getContext().getReactiveExecutor().scheduleSync(state); } else { - ReactiveHelper.scheduleMain(state); + exchange.getContext().getReactiveExecutor().scheduleMain(state); } return false; @@ -113,7 +112,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, processor.process(current, doneSync -> { // increment counter after done index++; - ReactiveHelper.schedule(this); + exchange.getContext().getReactiveExecutor().schedule(this); }); } else { // we are done so prepare the result diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 594c936..4d0806f 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -56,7 +56,6 @@ import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.EventHelper; import org.apache.camel.support.ExchangeHelper; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CastUtils; import org.apache.camel.util.IOHelper; @@ -220,12 +219,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat MulticastState state = new MulticastState(exchange, pairs, callback); if (isParallelProcessing()) { - executorService.submit(() -> ReactiveHelper.schedule(state)); + executorService.submit(() -> exchange.getContext().getReactiveExecutor().schedule(state)); } else { if (exchange.isTransacted()) { - ReactiveHelper.scheduleSync(state); + exchange.getContext().getReactiveExecutor().scheduleSync(state); } else { - ReactiveHelper.scheduleMain(state); + exchange.getContext().getReactiveExecutor().scheduleMain(state); } } @@ -237,9 +236,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat protected void schedule(Runnable runnable) { if (isParallelProcessing()) { - executorService.submit(() -> ReactiveHelper.schedule(runnable)); + executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable)); } else { - ReactiveHelper.schedule(runnable, "Multicast next step"); + camelContext.getReactiveExecutor().schedule(runnable, "Multicast next step"); } } @@ -524,7 +523,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); } - ReactiveHelper.callback(callback); + camelContext.getReactiveExecutor().callback(callback); } /** diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index 077c8de..9ffe248 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -33,7 +33,6 @@ import org.apache.camel.spi.IdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.support.service.ServiceHelper; import static org.apache.camel.processor.PipelineHelper.continueProcessing; @@ -82,10 +81,10 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (exchange.isTransacted()) { - ReactiveHelper.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true), + camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true), "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"); } else { - ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true), + camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true), "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"); } return false; @@ -105,7 +104,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo AsyncProcessor processor = processors.next(); processor.process(exchange, doneSync -> - ReactiveHelper.schedule(() -> doProcess(exchange, callback, processors, false), + camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false), "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]")); } else { ExchangeHelper.copyResults(exchange, exchange); @@ -115,7 +114,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - ReactiveHelper.callback(callback); + camelContext.getReactiveExecutor().callback(callback); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java index 0cf2654..1a5e92d 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java @@ -36,7 +36,6 @@ import org.apache.camel.spi.Transformer; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; import org.apache.camel.support.OrderedComparator; -import org.apache.camel.support.ReactiveHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -192,7 +191,7 @@ public class SharedCamelInternalProcessor { // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- - ReactiveHelper.schedule(() -> { + exchange.getContext().getReactiveExecutor().schedule(() -> { // execute any after processor work (in current thread, not in the callback) if (uow != null) { uow.afterProcess(processor, exchange, callback, sync); @@ -255,7 +254,7 @@ public class SharedCamelInternalProcessor { // CAMEL END USER - DEBUG ME HERE +++ START +++ // ---------------------------------------------------------- // callback must be called - ReactiveHelper.callback(callback); + exchange.getContext().getReactiveExecutor().callback(callback); // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java index 264ba2b..9b1d50d 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -30,7 +30,6 @@ import org.apache.camel.spi.IdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.support.service.ServiceHelper; /** @@ -61,7 +60,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc public boolean process(Exchange exchange, AsyncCallback callback) { - ReactiveHelper.schedule(new TryState(exchange, callback)); + exchange.getContext().getReactiveExecutor().schedule(new TryState(exchange, callback)); return false; } @@ -90,7 +89,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - async.process(exchange, doneSync -> ReactiveHelper.schedule(this)); + async.process(exchange, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this)); } else { ExchangeHelper.prepareOutToIn(exchange); exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 2ccde2e..f4508dd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -64,7 +64,6 @@ import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.LRUCacheFactory; import org.apache.camel.support.LoggingExceptionHandler; import org.apache.camel.support.NoLock; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; @@ -770,7 +769,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat // send this exchange // the call to schedule last if needed to ensure in-order processing of the aggregates - executorService.submit(() -> ReactiveHelper.scheduleSync(() -> processor.process(exchange, done -> { + executorService.submit(() -> camelContext.getReactiveExecutor().scheduleSync(() -> processor.process(exchange, done -> { // log exception if there was a problem if (exchange.getException() != null) { // if there was an exception then let the exception handler handle it diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 782d8a7..2628c67 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -48,7 +48,6 @@ import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.EventHelper; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.MessageHelper; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.support.processor.DefaultExchangeFormatter; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; @@ -153,9 +152,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme RedeliveryState state = new RedeliveryState(exchange, callback); // Run it if (exchange.isTransacted()) { - ReactiveHelper.scheduleSync(state); + camelContext.getReactiveExecutor().scheduleSync(state); } else { - ReactiveHelper.scheduleMain(state); + camelContext.getReactiveExecutor().scheduleMain(state); } return false; } @@ -442,7 +441,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme if (log.isTraceEnabled()) { log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId()); } - executorService.schedule(() -> ReactiveHelper.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS); + executorService.schedule(() -> camelContext.getReactiveExecutor().schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS); } else { // async delayed redelivery was disabled or we are transacted so we must be synchronous @@ -458,9 +457,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); // jump to start of loop which then detects that we are failed and exhausted - ReactiveHelper.schedule(this); + camelContext.getReactiveExecutor().schedule(this); } else { - ReactiveHelper.schedule(this::redeliver); + camelContext.getReactiveExecutor().schedule(this::redeliver); } } catch (InterruptedException e) { redeliverySleepCounter.decrementAndGet(); @@ -469,12 +468,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // mark the exchange to stop continue routing when interrupted // as we do not want to continue routing (for example a task has been cancelled) exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); - ReactiveHelper.callback(callback); + camelContext.getReactiveExecutor().callback(callback); } } } else { // execute the task immediately - ReactiveHelper.schedule(this::redeliver); + camelContext.getReactiveExecutor().schedule(this::redeliver); } } else { // Simple delivery @@ -482,10 +481,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // only process if the exchange hasn't failed // and it has not been handled by the error processor if (isDone(exchange)) { - ReactiveHelper.callback(callback); + camelContext.getReactiveExecutor().callback(callback); } else { // error occurred so loop back around which we do by invoking the processAsyncErrorHandler - ReactiveHelper.schedule(this); + camelContext.getReactiveExecutor().schedule(this); } }); } @@ -563,11 +562,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // only process if the exchange hasn't failed // and it has not been handled by the error processor if (isDone(exchange)) { - ReactiveHelper.callback(callback); + camelContext.getReactiveExecutor().callback(callback); return; } else { // error occurred so loop back around which we do by invoking the processAsyncErrorHandler - ReactiveHelper.schedule(this); + camelContext.getReactiveExecutor().schedule(this); } }); } @@ -845,7 +844,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); } finally { // if the fault was handled asynchronously, this should be reflected in the callback as well - ReactiveHelper.callback(callback); + camelContext.getReactiveExecutor().callback(callback); } }); } else { @@ -864,7 +863,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue); } finally { // callback we are done - ReactiveHelper.callback(callback); + camelContext.getReactiveExecutor().callback(callback); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java index 059c795..89a6f91 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java @@ -27,7 +27,6 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.Traceable; import org.apache.camel.support.ExchangeHelper; -import org.apache.camel.support.ReactiveHelper; import org.apache.camel.util.ObjectHelper; /** @@ -159,7 +158,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab public boolean process(final Exchange exchange, final AsyncCallback callback) { AsyncProcessor[] processors = doGetProcessors(); - ReactiveHelper.schedule(new State(exchange, callback, processors)::run); + exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run); return false; } @@ -246,7 +245,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab // process the exchange log.debug("Processing failover at attempt {} for {}", attempts, copy); - processor.process(copy, doneSync -> ReactiveHelper.schedule(this::run)); + processor.process(copy, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this::run)); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java index 09ba098..ebd0f87 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java @@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.support.ReactiveHelper; /** * A {@link LoadBalancer} implementations which sends to all destinations @@ -33,7 +32,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport { public boolean process(final Exchange exchange, final AsyncCallback callback) { AsyncProcessor[] processors = doGetProcessors(); - ReactiveHelper.schedule(new State(exchange, callback, processors)::run); + exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run); return false; } @@ -64,7 +63,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport { exchange.setException(current.getException()); callback.done(false); } else { - ReactiveHelper.schedule(this::run); + exchange.getContext().getReactiveExecutor().schedule(this::run); } } }