This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 074cdce530367dddae229b72e9ab4cabe5b7073b
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Jan 24 13:16:35 2020 +0100

    CAMEL-14435: camel-core - Optimize routing engine
---
 .../camel/impl/engine/DefaultRouteContext.java     |  2 +-
 .../camel/impl/engine/DefaultUnitOfWork.java       |  9 +++--
 .../impl/engine/SubscribeMethodProcessor.java      |  2 +-
 .../camel/processor/CamelInternalProcessor.java    | 43 ++++++----------------
 .../apache/camel/processor/MulticastProcessor.java |  2 +-
 .../apache/camel/processor/UnitOfWorkProducer.java |  2 +-
 .../camel/processor/channel/DefaultChannel.java    | 13 +++++--
 .../org/apache/camel/reifier/AggregateReifier.java |  2 +-
 .../apache/camel/reifier/OnCompletionReifier.java  |  2 +-
 .../org/apache/camel/reifier/ProcessorReifier.java |  2 +-
 .../apache/camel/reifier/ResequenceReifier.java    |  4 +-
 .../org/apache/camel/reifier/WireTapReifier.java   |  2 +-
 12 files changed, 36 insertions(+), 49 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
index 282fbd5..e7739d6 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
@@ -176,7 +176,7 @@ public class DefaultRouteContext implements RouteContext {
             Processor target = new Pipeline(getCamelContext(), 
eventDrivenProcessors);
 
             // and wrap it in a unit of work so the UoW is on the top, so the 
entire route will be in the same UoW
-            CamelInternalProcessor internal = new 
CamelInternalProcessor(target);
+            CamelInternalProcessor internal = new 
CamelInternalProcessor(getCamelContext(), target);
             internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(this));
 
             // and then optionally add route policy processor if a custom 
policy is set
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index deba2d8..e2d69dd 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -33,6 +33,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
+import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationVetoable;
@@ -60,6 +61,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service 
{
     private String id;
     private final Logger log;
     private final CamelContext context;
+    private final InflightRepository inflightRepository;
     private RouteContext prevRouteContext;
     private RouteContext routeContext;
     private List<Synchronization> synchronizations;
@@ -76,6 +78,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service 
{
             log.trace("UnitOfWork created for ExchangeId: {} with {}", 
exchange.getExchangeId(), exchange);
         }
         context = exchange.getContext();
+        inflightRepository = exchange.getContext().getInflightRepository();
 
         if (context.isAllowUseOriginalMessage()) {
             // special for JmsMessage as it can cause it to loose headers 
later.
@@ -112,7 +115,7 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
         }
 
         // register to inflight registry
-        context.getInflightRepository().add(exchange);
+        inflightRepository.add(exchange);
     }
 
     UnitOfWork newInstance(Exchange exchange) {
@@ -206,9 +209,7 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
         UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log);
 
         // unregister from inflight registry, before signalling we are done
-        if (exchange.getContext() != null) {
-            exchange.getContext().getInflightRepository().remove(exchange);
-        }
+        inflightRepository.remove(exchange);
 
         // then fire event to signal the exchange is done
         try {
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
index b010417..14d2fc9 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
@@ -57,7 +57,7 @@ public final class SubscribeMethodProcessor extends 
AsyncProcessorSupport implem
         Processor answer = 
endpoint.getCamelContext().adapt(ExtendedCamelContext.class)
                 
.getBeanProcessorFactory().createBeanProcessor(endpoint.getCamelContext(), 
pojo, method);
         // must ensure the consumer is being executed in an unit of work so 
synchronization callbacks etc is invoked
-        CamelInternalProcessor internal = new CamelInternalProcessor(answer);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(endpoint.getCamelContext(), answer);
         internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
 
         Predicate p;
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index bcd767b..527a972 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -92,42 +92,29 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * The added advices can implement {@link Ordered} to control in which order 
the advices are executed.
  */
-public class CamelInternalProcessor extends DelegateAsyncProcessor implements 
CamelContextAware {
+public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelInternalProcessor.class);
 
     private static final Object[] EMPTY_STATES = new Object[0];
 
-    private CamelContext camelContext;
-    private ReactiveExecutor reactiveExecutor;
-    private ShutdownStrategy shutdownStrategy;
+    private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
+    private final ShutdownStrategy shutdownStrategy;
     private final List<CamelInternalProcessorAdvice<?>> advices = new 
ArrayList<>();
     private byte statefulAdvices;
 
-    public CamelInternalProcessor() {
+    public CamelInternalProcessor(CamelContext camelContext) {
+        this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.shutdownStrategy = camelContext.getShutdownStrategy();
     }
 
-    public CamelInternalProcessor(Processor processor) {
+    public CamelInternalProcessor(CamelContext camelContext, Processor 
processor) {
         super(processor);
-    }
-
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    public void setCamelContext(CamelContext camelContext) {
         this.camelContext = camelContext;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-
-        // optimize to preset reactive executor
-        if (camelContext != null) {
-            reactiveExecutor = camelContext.getReactiveExecutor();
-            shutdownStrategy = camelContext.getShutdownStrategy();
-        }
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.shutdownStrategy = camelContext.getShutdownStrategy();
     }
 
     /**
@@ -184,14 +171,6 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements Ca
             return true;
         }
 
-        // TODO: should not be needed
-        if (reactiveExecutor == null) {
-            reactiveExecutor = exchange.getContext().getReactiveExecutor();
-        }
-        if (shutdownStrategy == null) {
-            shutdownStrategy = exchange.getContext().getShutdownStrategy();
-        }
-
         // optimise to use object array for states, and only for the number of 
advices that keep state
         final Object[] states = statefulAdvices > 0 ? new 
Object[statefulAdvices] : EMPTY_STATES;
         // optimise for loop using index access to avoid creating iterator 
object
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 2f4b637..7a4c0bd 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -765,7 +765,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
      * @return the unit of work processor
      */
     protected Processor createUnitOfWorkProcessor(RouteContext routeContext, 
Processor processor, Exchange exchange) {
-        CamelInternalProcessor internal = new 
CamelInternalProcessor(processor);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(exchange.getContext(), processor);
 
         // and wrap it in a unit of work so the UoW is on the top, so the 
entire route will be in the same UoW
         UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, 
UnitOfWork.class);
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index bd92cab..fecac02 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -41,7 +41,7 @@ public final class UnitOfWorkProducer extends 
DefaultAsyncProducer {
         super(producer.getEndpoint());
         this.producer = producer;
         // wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(producer);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(producer.getEndpoint().getCamelContext(), producer);
         internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
         this.processor = internal;
     }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
index 92ef735..5034a07 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
@@ -70,6 +70,10 @@ public class DefaultChannel extends CamelInternalProcessor 
implements Channel {
     private RouteContext routeContext;
     private boolean routeScoped = true;
 
+    public DefaultChannel(CamelContext camelContext) {
+        super(camelContext);
+    }
+
     @Override
     public Processor getOutput() {
         // the errorHandler is already decorated with interceptors
@@ -133,7 +137,8 @@ public class DefaultChannel extends CamelInternalProcessor 
implements Channel {
 
     @Override
     protected void doStart() throws Exception {
-        super.doStart();
+        // do not call super as we want to be in control here of the lifecycle
+
         // the output has now been created, so assign the output as the 
processor
         setProcessor(getOutput());
         ServiceHelper.startService(errorHandler, output);
@@ -141,7 +146,8 @@ public class DefaultChannel extends CamelInternalProcessor 
implements Channel {
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
+        // do not call super as we want to be in control here of the lifecycle
+
         if (isRouteScoped()) {
             // only stop services if not context scoped (as context scoped is 
reused by others)
             ServiceHelper.stopService(output, errorHandler);
@@ -150,7 +156,8 @@ public class DefaultChannel extends CamelInternalProcessor 
implements Channel {
 
     @Override
     protected void doShutdown() throws Exception {
-        super.doShutdown();
+        // do not call super as we want to be in control here of the lifecycle
+
         ServiceHelper.stopAndShutdownServices(output, errorHandler);
     }
 
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java
 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 5fb0845..791747e 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -52,7 +52,7 @@ public class AggregateReifier extends 
ProcessorReifier<AggregateDefinition> {
         Processor childProcessor = this.createChildProcessor(routeContext, 
true);
 
         // wrap the aggregate route in a unit of work processor
-        CamelInternalProcessor internal = new 
CamelInternalProcessor(childProcessor);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(routeContext.getCamelContext(), childProcessor);
         internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         Expression correlation = 
definition.getExpression().createExpression(routeContext);
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index ab18213..b2d9a3c 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -66,7 +66,7 @@ public class OnCompletionReifier extends 
ProcessorReifier<OnCompletionDefinition
         Processor childProcessor = this.createChildProcessor(routeContext, 
true);
 
         // wrap the on completion route in a unit of work processor
-        CamelInternalProcessor internal = new 
CamelInternalProcessor(childProcessor);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(routeContext.getCamelContext(), childProcessor);
         internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         routeContext.setOnCompletion(getId(definition, routeContext), 
internal);
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index a2b7cd3..f9bdb54 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -289,7 +289,7 @@ public abstract class ProcessorReifier<T extends 
ProcessorDefinition<?>> extends
     protected Channel wrapChannel(RouteContext routeContext, Processor 
processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler) throws 
Exception {
         // put a channel in between this and each output to control the route
         // flow logic
-        DefaultChannel channel = new DefaultChannel();
+        DefaultChannel channel = new 
DefaultChannel(routeContext.getCamelContext());
 
         // add interceptor strategies to the channel must be in this order:
         // camel context, route context, local
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
index 612e020..a1e33ef 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
@@ -75,7 +75,7 @@ public class ResequenceReifier extends 
ProcessorReifier<ResequenceDefinition> {
         Expression expression = 
definition.getExpression().createExpression(routeContext);
 
         // and wrap in unit of work
-        CamelInternalProcessor internal = new 
CamelInternalProcessor(processor);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(routeContext.getCamelContext(), processor);
         internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         ObjectHelper.notNull(config, "config", this);
@@ -108,7 +108,7 @@ public class ResequenceReifier extends 
ProcessorReifier<ResequenceDefinition> {
         Processor processor = this.createChildProcessor(routeContext, true);
         Expression expression = 
definition.getExpression().createExpression(routeContext);
 
-        CamelInternalProcessor internal = new 
CamelInternalProcessor(processor);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(routeContext.getCamelContext(), processor);
         internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         ObjectHelper.notNull(config, "config", this);
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 7a89b1a..1f16c8d 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -54,7 +54,7 @@ public class WireTapReifier extends 
ToDynamicReifier<WireTapDefinition<?>> {
         Processor target = wrapInErrorHandler(routeContext, dynamicTo);
 
         // and wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(target);
+        CamelInternalProcessor internal = new 
CamelInternalProcessor(routeContext.getCamelContext(), target);
         internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         // is true by default

Reply via email to