This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit f807dab045286a1a10e9c5bbce68dac9168793b6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jan 24 12:24:20 2020 +0100 CAMEL-14435: camel-core - Optimize getting reactive executor in routing engine --- .../apache/camel/impl/engine/BaseRouteService.java | 5 ++++ .../engine/DefaultAsyncProcessorAwaitManager.java | 5 +++- .../camel/impl/engine/DefaultProducerCache.java | 2 +- .../camel/processor/CamelInternalProcessor.java | 32 ++++++++++++++++++++-- .../java/org/apache/camel/processor/Pipeline.java | 11 +++++--- .../processor/SharedCamelInternalProcessor.java | 16 ++++++++--- .../camel/processor/channel/DefaultChannel.java | 3 ++ .../errorhandler/RedeliveryErrorHandler.java | 29 +++++++++++--------- 8 files changed, 77 insertions(+), 26 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java index a6017c5..27748fd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Channel; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; @@ -174,6 +175,10 @@ public abstract class BaseRouteService extends ChildServiceSupport { if (service instanceof RouteIdAware) { ((RouteIdAware) service).setRouteId(route.getId()); } + // inject camel context + if (service instanceof CamelContextAware) { + ((CamelContextAware) service).setCamelContext(camelContext); + } if (service instanceof Consumer) { inputs.put(route, (Consumer) service); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java index 665d779..7631456 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java @@ -31,6 +31,7 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.StaticService; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.ExchangeFormatter; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.MessageHelper; @@ -86,12 +87,14 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements } public void await(Exchange exchange, CountDownLatch latch) { + ReactiveExecutor reactiveExecutor = exchange.getContext().getReactiveExecutor(); // Early exit for pending reactive queued work do { if (latch.getCount() <= 0) { return; } - } while (exchange.getContext().getReactiveExecutor().executeFromQueue()); + } while (reactiveExecutor.executeFromQueue()); + LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); try { diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java index 7dd8f9a..635ab7b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java @@ -74,7 +74,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach } // internal processor used for sending - internalProcessor = new SharedCamelInternalProcessor(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); + internalProcessor = new SharedCamelInternalProcessor(camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); } protected ProducerServicePool createServicePool(CamelContext camelContext, int cacheSize) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 0820b80..0f82c90 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; @@ -43,6 +44,7 @@ import org.apache.camel.spi.Debugger; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor; import org.apache.camel.spi.MessageHistoryFactory; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.StreamCachingStrategy; @@ -89,12 +91,14 @@ import org.slf4j.LoggerFactory; * <p/> * The added advices can implement {@link Ordered} to control in which order the advices are executed. */ -public class CamelInternalProcessor extends DelegateAsyncProcessor { +public class CamelInternalProcessor extends DelegateAsyncProcessor implements CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class); private static final Object[] EMPTY_STATES = new Object[0]; + private CamelContext camelContext; + private ReactiveExecutor reactiveExecutor; private final List<CamelInternalProcessorAdvice<?>> advices = new ArrayList<>(); private byte statefulAdvices; @@ -105,6 +109,24 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { super(processor); } + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + // optimize to preset reactive executor + if (camelContext != null) { + reactiveExecutor = camelContext.getReactiveExecutor(); + } + } + /** * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor. * @@ -159,6 +181,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { return true; } + if (reactiveExecutor == null) { + reactiveExecutor = exchange.getContext().getReactiveExecutor(); + } + // optimise to use object array for states, and only for the number of advices that keep state final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES; // optimise for loop using index access to avoid creating iterator object @@ -198,7 +224,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // ---------------------------------------------------------- // callback must be called if (originalCallback != null) { - exchange.getContext().getReactiveExecutor().schedule(originalCallback); + reactiveExecutor.schedule(originalCallback); } // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ @@ -252,7 +278,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // optimize to only do after uow processing if really needed if (beforeAndAfter) { - exchange.getContext().getReactiveExecutor().schedule(() -> { + reactiveExecutor.schedule(() -> { // execute any after processor work (in current thread, not in the callback) uow.afterProcess(processor, exchange, callback, false); }); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index 7557450..3cfba1b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -30,6 +30,7 @@ import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; @@ -50,12 +51,14 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private final CamelContext camelContext; + private final ReactiveExecutor reactiveExecutor; private List<AsyncProcessor> processors; private String id; private String routeId; public Pipeline(CamelContext camelContext, Collection<Processor> processors) { this.camelContext = camelContext; + this.reactiveExecutor = camelContext.getReactiveExecutor(); this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList()); } @@ -88,9 +91,9 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (exchange.isTransacted()) { - camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true)); + reactiveExecutor.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true)); } else { - camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true)); + reactiveExecutor.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true)); } return false; } @@ -108,7 +111,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo AsyncProcessor processor = processors.get(index.getAndIncrement()); processor.process(exchange, doneSync -> - camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, index, false))); + reactiveExecutor.schedule(() -> doProcess(exchange, callback, processors, index, false))); } else { ExchangeHelper.copyResults(exchange, exchange); @@ -119,7 +122,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); } - camelContext.getReactiveExecutor().schedule(callback); + reactiveExecutor.schedule(callback); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java index 687b875b..c47fc50 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java @@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Ordered; @@ -31,6 +32,7 @@ import org.apache.camel.Processor; import org.apache.camel.Service; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelInternalProcessorAdvice; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.Transformer; import org.apache.camel.spi.UnitOfWork; @@ -70,10 +72,17 @@ public class SharedCamelInternalProcessor { private static final Logger LOG = LoggerFactory.getLogger(SharedCamelInternalProcessor.class); private static final Object[] EMPTY_STATES = new Object[0]; + private final CamelContext camelContext; + private final ReactiveExecutor reactiveExecutor; + private final AsyncProcessorAwaitManager awaitManager; private final List<CamelInternalProcessorAdvice> advices; private byte statefulAdvices; - public SharedCamelInternalProcessor(CamelInternalProcessorAdvice... advices) { + public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice... advices) { + this.camelContext = camelContext; + this.reactiveExecutor = camelContext.getReactiveExecutor(); + this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); + if (advices != null) { this.advices = new ArrayList<>(advices.length); for (CamelInternalProcessorAdvice advice : advices) { @@ -93,7 +102,6 @@ public class SharedCamelInternalProcessor { * Synchronous API */ public void process(Exchange exchange, AsyncProcessor processor, Processor resultProcessor) { - final AsyncProcessorAwaitManager awaitManager = exchange.getContext().adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); awaitManager.process(new AsyncProcessor() { @Override public boolean process(Exchange exchange, AsyncCallback callback) { @@ -206,7 +214,7 @@ public class SharedCamelInternalProcessor { // optimize to only do after uow processing if really needed if (beforeAndAfter) { - exchange.getContext().getReactiveExecutor().schedule(() -> { + reactiveExecutor.schedule(() -> { // execute any after processor work (in current thread, not in the callback) uow.afterProcess(processor, exchange, callback, sync); }); @@ -272,7 +280,7 @@ public class SharedCamelInternalProcessor { // ---------------------------------------------------------- // callback must be called if (callback != null) { - exchange.getContext().getReactiveExecutor().schedule(callback); + reactiveExecutor.schedule(callback); } // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java index 028fb25..92ef735 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java @@ -133,6 +133,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { @Override protected void doStart() throws Exception { + super.doStart(); // the output has now been created, so assign the output as the processor setProcessor(getOutput()); ServiceHelper.startService(errorHandler, output); @@ -140,6 +141,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { @Override protected void doStop() throws Exception { + super.doStop(); if (isRouteScoped()) { // only stop services if not context scoped (as context scoped is reused by others) ServiceHelper.stopService(output, errorHandler); @@ -148,6 +150,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { @Override protected void doShutdown() throws Exception { + super.doShutdown(); ServiceHelper.stopAndShutdownServices(output, errorHandler); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 637d2fc..2fcaae2 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -39,6 +39,7 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.ExchangeFormatter; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.ShutdownPrepared; import org.apache.camel.spi.UnitOfWork; @@ -70,6 +71,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme protected final AtomicInteger redeliverySleepCounter = new AtomicInteger(); protected ScheduledExecutorService executorService; protected final CamelContext camelContext; + protected final ReactiveExecutor reactiveExecutor; protected final AsyncProcessorAwaitManager awaitManager; protected final Processor deadLetter; protected final String deadLetterUri; @@ -98,6 +100,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); this.camelContext = camelContext; + this.reactiveExecutor = camelContext.getReactiveExecutor(); this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); this.redeliveryProcessor = redeliveryProcessor; this.deadLetter = deadLetter; @@ -160,9 +163,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme RedeliveryState state = new RedeliveryState(exchange, callback); // Run it if (exchange.isTransacted()) { - camelContext.getReactiveExecutor().scheduleSync(state); + reactiveExecutor.scheduleSync(state); } else { - camelContext.getReactiveExecutor().scheduleMain(state); + reactiveExecutor.scheduleMain(state); } return false; } @@ -440,7 +443,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme if (LOG.isTraceEnabled()) { LOG.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId()); } - executorService.schedule(() -> camelContext.getReactiveExecutor().schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS); + executorService.schedule(() -> reactiveExecutor.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS); } else { // async delayed redelivery was disabled or we are transacted so we must be synchronous @@ -456,9 +459,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); // jump to start of loop which then detects that we are failed and exhausted - camelContext.getReactiveExecutor().schedule(this); + reactiveExecutor.schedule(this); } else { - camelContext.getReactiveExecutor().schedule(this::redeliver); + reactiveExecutor.schedule(this::redeliver); } } catch (InterruptedException e) { redeliverySleepCounter.decrementAndGet(); @@ -467,12 +470,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // mark the exchange to stop continue routing when interrupted // as we do not want to continue routing (for example a task has been cancelled) exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); - camelContext.getReactiveExecutor().schedule(callback); + reactiveExecutor.schedule(callback); } } } else { // execute the task immediately - camelContext.getReactiveExecutor().schedule(this::redeliver); + reactiveExecutor.schedule(this::redeliver); } } else { // Simple delivery @@ -480,10 +483,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // only process if the exchange hasn't failed // and it has not been handled by the error processor if (isDone(exchange)) { - camelContext.getReactiveExecutor().schedule(callback); + reactiveExecutor.schedule(callback); } else { // error occurred so loop back around which we do by invoking the processAsyncErrorHandler - camelContext.getReactiveExecutor().schedule(this); + reactiveExecutor.schedule(this); } }); } @@ -561,11 +564,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // only process if the exchange hasn't failed // and it has not been handled by the error processor if (isDone(exchange)) { - camelContext.getReactiveExecutor().schedule(callback); + reactiveExecutor.schedule(callback); return; } else { // error occurred so loop back around which we do by invoking the processAsyncErrorHandler - camelContext.getReactiveExecutor().schedule(this); + reactiveExecutor.schedule(this); } }); } @@ -851,7 +854,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); } finally { // if the fault was handled asynchronously, this should be reflected in the callback as well - camelContext.getReactiveExecutor().schedule(callback); + reactiveExecutor.schedule(callback); } }); } else { @@ -870,7 +873,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue); } finally { // callback we are done - camelContext.getReactiveExecutor().schedule(callback); + reactiveExecutor.schedule(callback); } }