This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 074cdce530367dddae229b72e9ab4cabe5b7073b Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jan 24 13:16:35 2020 +0100 CAMEL-14435: camel-core - Optimize routing engine --- .../camel/impl/engine/DefaultRouteContext.java | 2 +- .../camel/impl/engine/DefaultUnitOfWork.java | 9 +++-- .../impl/engine/SubscribeMethodProcessor.java | 2 +- .../camel/processor/CamelInternalProcessor.java | 43 ++++++---------------- .../apache/camel/processor/MulticastProcessor.java | 2 +- .../apache/camel/processor/UnitOfWorkProducer.java | 2 +- .../camel/processor/channel/DefaultChannel.java | 13 +++++-- .../org/apache/camel/reifier/AggregateReifier.java | 2 +- .../apache/camel/reifier/OnCompletionReifier.java | 2 +- .../org/apache/camel/reifier/ProcessorReifier.java | 2 +- .../apache/camel/reifier/ResequenceReifier.java | 4 +- .../org/apache/camel/reifier/WireTapReifier.java | 2 +- 12 files changed, 36 insertions(+), 49 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java index 282fbd5..e7739d6 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java @@ -176,7 +176,7 @@ public class DefaultRouteContext implements RouteContext { Processor target = new Pipeline(getCamelContext(), eventDrivenProcessors); // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW - CamelInternalProcessor internal = new CamelInternalProcessor(target); + CamelInternalProcessor internal = new CamelInternalProcessor(getCamelContext(), target); internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this)); // and then optionally add route policy processor if a custom policy is set diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index deba2d8..e2d69dd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -33,6 +33,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.Service; +import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationVetoable; @@ -60,6 +61,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { private String id; private final Logger log; private final CamelContext context; + private final InflightRepository inflightRepository; private RouteContext prevRouteContext; private RouteContext routeContext; private List<Synchronization> synchronizations; @@ -76,6 +78,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { log.trace("UnitOfWork created for ExchangeId: {} with {}", exchange.getExchangeId(), exchange); } context = exchange.getContext(); + inflightRepository = exchange.getContext().getInflightRepository(); if (context.isAllowUseOriginalMessage()) { // special for JmsMessage as it can cause it to loose headers later. @@ -112,7 +115,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { } // register to inflight registry - context.getInflightRepository().add(exchange); + inflightRepository.add(exchange); } UnitOfWork newInstance(Exchange exchange) { @@ -206,9 +209,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log); // unregister from inflight registry, before signalling we are done - if (exchange.getContext() != null) { - exchange.getContext().getInflightRepository().remove(exchange); - } + inflightRepository.remove(exchange); // then fire event to signal the exchange is done try { diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java index b010417..14d2fc9 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java @@ -57,7 +57,7 @@ public final class SubscribeMethodProcessor extends AsyncProcessorSupport implem Processor answer = endpoint.getCamelContext().adapt(ExtendedCamelContext.class) .getBeanProcessorFactory().createBeanProcessor(endpoint.getCamelContext(), pojo, method); // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked - CamelInternalProcessor internal = new CamelInternalProcessor(answer); + CamelInternalProcessor internal = new CamelInternalProcessor(endpoint.getCamelContext(), answer); internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); Predicate p; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index bcd767b..527a972 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -92,42 +92,29 @@ import org.slf4j.LoggerFactory; * <p/> * The added advices can implement {@link Ordered} to control in which order the advices are executed. */ -public class CamelInternalProcessor extends DelegateAsyncProcessor implements CamelContextAware { +public class CamelInternalProcessor extends DelegateAsyncProcessor { private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class); private static final Object[] EMPTY_STATES = new Object[0]; - private CamelContext camelContext; - private ReactiveExecutor reactiveExecutor; - private ShutdownStrategy shutdownStrategy; + private final CamelContext camelContext; + private final ReactiveExecutor reactiveExecutor; + private final ShutdownStrategy shutdownStrategy; private final List<CamelInternalProcessorAdvice<?>> advices = new ArrayList<>(); private byte statefulAdvices; - public CamelInternalProcessor() { + public CamelInternalProcessor(CamelContext camelContext) { + this.camelContext = camelContext; + this.reactiveExecutor = camelContext.getReactiveExecutor(); + this.shutdownStrategy = camelContext.getShutdownStrategy(); } - public CamelInternalProcessor(Processor processor) { + public CamelInternalProcessor(CamelContext camelContext, Processor processor) { super(processor); - } - - public CamelContext getCamelContext() { - return camelContext; - } - - public void setCamelContext(CamelContext camelContext) { this.camelContext = camelContext; - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - - // optimize to preset reactive executor - if (camelContext != null) { - reactiveExecutor = camelContext.getReactiveExecutor(); - shutdownStrategy = camelContext.getShutdownStrategy(); - } + this.reactiveExecutor = camelContext.getReactiveExecutor(); + this.shutdownStrategy = camelContext.getShutdownStrategy(); } /** @@ -184,14 +171,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca return true; } - // TODO: should not be needed - if (reactiveExecutor == null) { - reactiveExecutor = exchange.getContext().getReactiveExecutor(); - } - if (shutdownStrategy == null) { - shutdownStrategy = exchange.getContext().getShutdownStrategy(); - } - // optimise to use object array for states, and only for the number of advices that keep state final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES; // optimise for loop using index access to avoid creating iterator object diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 2f4b637..7a4c0bd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -765,7 +765,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat * @return the unit of work processor */ protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { - CamelInternalProcessor internal = new CamelInternalProcessor(processor); + CamelInternalProcessor internal = new CamelInternalProcessor(exchange.getContext(), processor); // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java index bd92cab..fecac02 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java @@ -41,7 +41,7 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer { super(producer.getEndpoint()); this.producer = producer; // wrap in unit of work - CamelInternalProcessor internal = new CamelInternalProcessor(producer); + CamelInternalProcessor internal = new CamelInternalProcessor(producer.getEndpoint().getCamelContext(), producer); internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); this.processor = internal; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java index 92ef735..5034a07 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java @@ -70,6 +70,10 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { private RouteContext routeContext; private boolean routeScoped = true; + public DefaultChannel(CamelContext camelContext) { + super(camelContext); + } + @Override public Processor getOutput() { // the errorHandler is already decorated with interceptors @@ -133,7 +137,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { @Override protected void doStart() throws Exception { - super.doStart(); + // do not call super as we want to be in control here of the lifecycle + // the output has now been created, so assign the output as the processor setProcessor(getOutput()); ServiceHelper.startService(errorHandler, output); @@ -141,7 +146,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { @Override protected void doStop() throws Exception { - super.doStop(); + // do not call super as we want to be in control here of the lifecycle + if (isRouteScoped()) { // only stop services if not context scoped (as context scoped is reused by others) ServiceHelper.stopService(output, errorHandler); @@ -150,7 +156,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { @Override protected void doShutdown() throws Exception { - super.doShutdown(); + // do not call super as we want to be in control here of the lifecycle + ServiceHelper.stopAndShutdownServices(output, errorHandler); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java index 5fb0845..791747e 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java @@ -52,7 +52,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { Processor childProcessor = this.createChildProcessor(routeContext, true); // wrap the aggregate route in a unit of work processor - CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); + CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), childProcessor); internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); Expression correlation = definition.getExpression().createExpression(routeContext); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java index ab18213..b2d9a3c 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java @@ -66,7 +66,7 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition Processor childProcessor = this.createChildProcessor(routeContext, true); // wrap the on completion route in a unit of work processor - CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); + CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), childProcessor); internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); routeContext.setOnCompletion(getId(definition, routeContext), internal); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index a2b7cd3..f9bdb54 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -289,7 +289,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends protected Channel wrapChannel(RouteContext routeContext, Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler) throws Exception { // put a channel in between this and each output to control the route // flow logic - DefaultChannel channel = new DefaultChannel(); + DefaultChannel channel = new DefaultChannel(routeContext.getCamelContext()); // add interceptor strategies to the channel must be in this order: // camel context, route context, local diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java index 612e020..a1e33ef 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java @@ -75,7 +75,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> { Expression expression = definition.getExpression().createExpression(routeContext); // and wrap in unit of work - CamelInternalProcessor internal = new CamelInternalProcessor(processor); + CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), processor); internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); ObjectHelper.notNull(config, "config", this); @@ -108,7 +108,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> { Processor processor = this.createChildProcessor(routeContext, true); Expression expression = definition.getExpression().createExpression(routeContext); - CamelInternalProcessor internal = new CamelInternalProcessor(processor); + CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), processor); internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); ObjectHelper.notNull(config, "config", this); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java index 7a89b1a..1f16c8d 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -54,7 +54,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { Processor target = wrapInErrorHandler(routeContext, dynamicTo); // and wrap in unit of work - CamelInternalProcessor internal = new CamelInternalProcessor(target); + CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), target); internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); // is true by default