This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f807dab045286a1a10e9c5bbce68dac9168793b6
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Jan 24 12:24:20 2020 +0100

    CAMEL-14435: camel-core - Optimize getting reactive executor in routing 
engine
---
 .../apache/camel/impl/engine/BaseRouteService.java |  5 ++++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  5 +++-
 .../camel/impl/engine/DefaultProducerCache.java    |  2 +-
 .../camel/processor/CamelInternalProcessor.java    | 32 ++++++++++++++++++++--
 .../java/org/apache/camel/processor/Pipeline.java  | 11 +++++---
 .../processor/SharedCamelInternalProcessor.java    | 16 ++++++++---
 .../camel/processor/channel/DefaultChannel.java    |  3 ++
 .../errorhandler/RedeliveryErrorHandler.java       | 29 +++++++++++---------
 8 files changed, 77 insertions(+), 26 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
index a6017c5..27748fd 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Channel;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
@@ -174,6 +175,10 @@ public abstract class BaseRouteService extends 
ChildServiceSupport {
                     if (service instanceof RouteIdAware) {
                         ((RouteIdAware) service).setRouteId(route.getId());
                     }
+                    // inject camel context
+                    if (service instanceof CamelContextAware) {
+                        ((CamelContextAware) 
service).setCamelContext(camelContext);
+                    }
 
                     if (service instanceof Consumer) {
                         inputs.put(route, (Consumer) service);
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 665d779..7631456 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -31,6 +31,7 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StaticService;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ExchangeFormatter;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.MessageHelper;
@@ -86,12 +87,14 @@ public class DefaultAsyncProcessorAwaitManager extends 
ServiceSupport implements
     }
 
     public void await(Exchange exchange, CountDownLatch latch) {
+        ReactiveExecutor reactiveExecutor = 
exchange.getContext().getReactiveExecutor();
         // Early exit for pending reactive queued work
         do {
             if (latch.getCount() <= 0) {
                 return;
             }
-        } while 
(exchange.getContext().getReactiveExecutor().executeFromQueue());
+        } while (reactiveExecutor.executeFromQueue());
+
         LOG.trace("Waiting for asynchronous callback before continuing for 
exchangeId: {} -> {}",
                 exchange.getExchangeId(), exchange);
         try {
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
index 7dd8f9a..635ab7b 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
@@ -74,7 +74,7 @@ public class DefaultProducerCache extends ServiceSupport 
implements ProducerCach
         }
 
         // internal processor used for sending
-        internalProcessor = new SharedCamelInternalProcessor(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
+        internalProcessor = new SharedCamelInternalProcessor(camelContext, new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
     }
 
     protected ProducerServicePool createServicePool(CamelContext camelContext, 
int cacheSize) {
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 0820b80..0f82c90 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
@@ -43,6 +44,7 @@ import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.InflightRepository;
 import 
org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
 import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.StreamCachingStrategy;
@@ -89,12 +91,14 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * The added advices can implement {@link Ordered} to control in which order 
the advices are executed.
  */
-public class CamelInternalProcessor extends DelegateAsyncProcessor {
+public class CamelInternalProcessor extends DelegateAsyncProcessor implements 
CamelContextAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelInternalProcessor.class);
 
     private static final Object[] EMPTY_STATES = new Object[0];
 
+    private CamelContext camelContext;
+    private ReactiveExecutor reactiveExecutor;
     private final List<CamelInternalProcessorAdvice<?>> advices = new 
ArrayList<>();
     private byte statefulAdvices;
 
@@ -105,6 +109,24 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
         super(processor);
     }
 
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // optimize to preset reactive executor
+        if (camelContext != null) {
+            reactiveExecutor = camelContext.getReactiveExecutor();
+        }
+    }
+
     /**
      * Adds an {@link CamelInternalProcessorAdvice} advice to the list of 
advices to execute by this internal processor.
      *
@@ -159,6 +181,10 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
             return true;
         }
 
+        if (reactiveExecutor == null) {
+            reactiveExecutor = exchange.getContext().getReactiveExecutor();
+        }
+
         // optimise to use object array for states, and only for the number of 
advices that keep state
         final Object[] states = statefulAdvices > 0 ? new 
Object[statefulAdvices] : EMPTY_STATES;
         // optimise for loop using index access to avoid creating iterator 
object
@@ -198,7 +224,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
                 // ----------------------------------------------------------
                 // callback must be called
                 if (originalCallback != null) {
-                    
exchange.getContext().getReactiveExecutor().schedule(originalCallback);
+                    reactiveExecutor.schedule(originalCallback);
                 }
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
@@ -252,7 +278,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
 
             // optimize to only do after uow processing if really needed
             if (beforeAndAfter) {
-                exchange.getContext().getReactiveExecutor().schedule(() -> {
+                reactiveExecutor.schedule(() -> {
                     // execute any after processor work (in current thread, 
not in the callback)
                     uow.afterProcess(processor, exchange, callback, false);
                 });
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java 
b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 7557450..3cfba1b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -30,6 +30,7 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
@@ -50,12 +51,14 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
     private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
 
     private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
     private List<AsyncProcessor> processors;
     private String id;
     private String routeId;
 
     public Pipeline(CamelContext camelContext, Collection<Processor> 
processors) {
         this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
         this.processors = 
processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
     }
 
@@ -88,9 +91,9 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            camelContext.getReactiveExecutor().scheduleSync(() -> 
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), 
true));
+            reactiveExecutor.scheduleSync(() -> 
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), 
true));
         } else {
-            camelContext.getReactiveExecutor().scheduleMain(() -> 
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), 
true));
+            reactiveExecutor.scheduleMain(() -> 
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), 
true));
         }
         return false;
     }
@@ -108,7 +111,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
             AsyncProcessor processor = processors.get(index.getAndIncrement());
 
             processor.process(exchange, doneSync ->
-                    camelContext.getReactiveExecutor().schedule(() -> 
doProcess(exchange, callback, processors, index, false)));
+                    reactiveExecutor.schedule(() -> doProcess(exchange, 
callback, processors, index, false)));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
 
@@ -119,7 +122,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
                 LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
             }
 
-            camelContext.getReactiveExecutor().schedule(callback);
+            reactiveExecutor.schedule(callback);
         }
     }
 
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 687b875b..c47fc50 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Ordered;
@@ -31,6 +32,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Service;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelInternalProcessorAdvice;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
@@ -70,10 +72,17 @@ public class SharedCamelInternalProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SharedCamelInternalProcessor.class);
     private static final Object[] EMPTY_STATES = new Object[0];
+    private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
+    private final AsyncProcessorAwaitManager awaitManager;
     private final List<CamelInternalProcessorAdvice> advices;
     private byte statefulAdvices;
 
-    public SharedCamelInternalProcessor(CamelInternalProcessorAdvice... 
advices) {
+    public SharedCamelInternalProcessor(CamelContext camelContext, 
CamelInternalProcessorAdvice... advices) {
+        this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.awaitManager = 
camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
+
         if (advices != null) {
             this.advices = new ArrayList<>(advices.length);
             for (CamelInternalProcessorAdvice advice : advices) {
@@ -93,7 +102,6 @@ public class SharedCamelInternalProcessor {
      * Synchronous API
      */
     public void process(Exchange exchange, AsyncProcessor processor, Processor 
resultProcessor) {
-        final AsyncProcessorAwaitManager awaitManager = 
exchange.getContext().adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         awaitManager.process(new AsyncProcessor() {
             @Override
             public boolean process(Exchange exchange, AsyncCallback callback) {
@@ -206,7 +214,7 @@ public class SharedCamelInternalProcessor {
 
             // optimize to only do after uow processing if really needed
             if (beforeAndAfter) {
-                exchange.getContext().getReactiveExecutor().schedule(() -> {
+                reactiveExecutor.schedule(() -> {
                     // execute any after processor work (in current thread, 
not in the callback)
                     uow.afterProcess(processor, exchange, callback, sync);
                 });
@@ -272,7 +280,7 @@ public class SharedCamelInternalProcessor {
                 // ----------------------------------------------------------
                 // callback must be called
                 if (callback != null) {
-                    
exchange.getContext().getReactiveExecutor().schedule(callback);
+                    reactiveExecutor.schedule(callback);
                 }
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
index 028fb25..92ef735 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
@@ -133,6 +133,7 @@ public class DefaultChannel extends CamelInternalProcessor 
implements Channel {
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
         // the output has now been created, so assign the output as the 
processor
         setProcessor(getOutput());
         ServiceHelper.startService(errorHandler, output);
@@ -140,6 +141,7 @@ public class DefaultChannel extends CamelInternalProcessor 
implements Channel {
 
     @Override
     protected void doStop() throws Exception {
+        super.doStop();
         if (isRouteScoped()) {
             // only stop services if not context scoped (as context scoped is 
reused by others)
             ServiceHelper.stopService(output, errorHandler);
@@ -148,6 +150,7 @@ public class DefaultChannel extends CamelInternalProcessor 
implements Channel {
 
     @Override
     protected void doShutdown() throws Exception {
+        super.doShutdown();
         ServiceHelper.stopAndShutdownServices(output, errorHandler);
     }
 
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 637d2fc..2fcaae2 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -39,6 +39,7 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ExchangeFormatter;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.ShutdownPrepared;
 import org.apache.camel.spi.UnitOfWork;
@@ -70,6 +71,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
     protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
     protected ScheduledExecutorService executorService;
     protected final CamelContext camelContext;
+    protected final ReactiveExecutor reactiveExecutor;
     protected final AsyncProcessorAwaitManager awaitManager;
     protected final Processor deadLetter;
     protected final String deadLetterUri;
@@ -98,6 +100,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
         ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
 
         this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
         this.awaitManager = 
camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         this.redeliveryProcessor = redeliveryProcessor;
         this.deadLetter = deadLetter;
@@ -160,9 +163,9 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
         RedeliveryState state = new RedeliveryState(exchange, callback);
         // Run it
         if (exchange.isTransacted()) {
-            camelContext.getReactiveExecutor().scheduleSync(state);
+            reactiveExecutor.scheduleSync(state);
         } else {
-            camelContext.getReactiveExecutor().scheduleMain(state);
+            reactiveExecutor.scheduleMain(state);
         }
         return false;
     }
@@ -440,7 +443,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Scheduling redelivery task to run in {} 
millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId());
                         }
-                        executorService.schedule(() -> 
camelContext.getReactiveExecutor().schedule(this::redeliver), redeliveryDelay, 
TimeUnit.MILLISECONDS);
+                        executorService.schedule(() -> 
reactiveExecutor.schedule(this::redeliver), redeliveryDelay, 
TimeUnit.MILLISECONDS);
 
                     } else {
                         // async delayed redelivery was disabled or we are 
transacted so we must be synchronous
@@ -456,9 +459,9 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                                 // mark the exchange as redelivery exhausted 
so the failure processor / dead letter channel can process the exchange
                                 
exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
                                 // jump to start of loop which then detects 
that we are failed and exhausted
-                                
camelContext.getReactiveExecutor().schedule(this);
+                                reactiveExecutor.schedule(this);
                             } else {
-                                
camelContext.getReactiveExecutor().schedule(this::redeliver);
+                                reactiveExecutor.schedule(this::redeliver);
                             }
                         } catch (InterruptedException e) {
                             redeliverySleepCounter.decrementAndGet();
@@ -467,12 +470,12 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                             // mark the exchange to stop continue routing when 
interrupted
                             // as we do not want to continue routing (for 
example a task has been cancelled)
                             exchange.setProperty(Exchange.ROUTE_STOP, 
Boolean.TRUE);
-                            
camelContext.getReactiveExecutor().schedule(callback);
+                            reactiveExecutor.schedule(callback);
                         }
                     }
                 } else {
                     // execute the task immediately
-                    
camelContext.getReactiveExecutor().schedule(this::redeliver);
+                    reactiveExecutor.schedule(this::redeliver);
                 }
             } else {
                 // Simple delivery
@@ -480,10 +483,10 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                     // only process if the exchange hasn't failed
                     // and it has not been handled by the error processor
                     if (isDone(exchange)) {
-                        camelContext.getReactiveExecutor().schedule(callback);
+                        reactiveExecutor.schedule(callback);
                     } else {
                         // error occurred so loop back around which we do by 
invoking the processAsyncErrorHandler
-                        camelContext.getReactiveExecutor().schedule(this);
+                        reactiveExecutor.schedule(this);
                     }
                 });
             }
@@ -561,11 +564,11 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
-                    camelContext.getReactiveExecutor().schedule(callback);
+                    reactiveExecutor.schedule(callback);
                     return;
                 } else {
                     // error occurred so loop back around which we do by 
invoking the processAsyncErrorHandler
-                    camelContext.getReactiveExecutor().schedule(this);
+                    reactiveExecutor.schedule(this);
                 }
             });
         }
@@ -851,7 +854,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                         
EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, 
processor, deadLetterChannel, deadLetterUri);
                     } finally {
                         // if the fault was handled asynchronously, this 
should be reflected in the callback as well
-                        camelContext.getReactiveExecutor().schedule(callback);
+                        reactiveExecutor.schedule(callback);
                     }
                 });
             } else {
@@ -870,7 +873,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                     prepareExchangeAfterFailure(exchange, isDeadLetterChannel, 
shouldHandle, shouldContinue);
                 } finally {
                     // callback we are done
-                    camelContext.getReactiveExecutor().schedule(callback);
+                    reactiveExecutor.schedule(callback);
                 }
             }
 

Reply via email to