This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 976dd82879698f67f784457f5c3dec6067d8b3fc Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jan 24 12:34:38 2020 +0100 CAMEL-14435: camel-core - Optimize getting shutdown strategy in routing engine --- .../java/org/apache/camel/processor/CamelInternalProcessor.java | 9 ++++++++- .../org/apache/camel/processor/SharedCamelInternalProcessor.java | 6 ++++-- .../camel/processor/errorhandler/RedeliveryErrorHandler.java | 5 ++++- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 0f82c90..bcd767b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -47,6 +47,7 @@ import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RoutePolicy; +import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.Tracer; @@ -99,6 +100,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca private CamelContext camelContext; private ReactiveExecutor reactiveExecutor; + private ShutdownStrategy shutdownStrategy; private final List<CamelInternalProcessorAdvice<?>> advices = new ArrayList<>(); private byte statefulAdvices; @@ -124,6 +126,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca // optimize to preset reactive executor if (camelContext != null) { reactiveExecutor = camelContext.getReactiveExecutor(); + shutdownStrategy = camelContext.getShutdownStrategy(); } } @@ -181,9 +184,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca return true; } + // TODO: should not be needed if (reactiveExecutor == null) { reactiveExecutor = exchange.getContext().getReactiveExecutor(); } + if (shutdownStrategy == null) { + shutdownStrategy = exchange.getContext().getShutdownStrategy(); + } // optimise to use object array for states, and only for the number of advices that keep state final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES; @@ -312,7 +319,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca } // determine if we can still run, or the camel context is forcing a shutdown - boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this); + boolean forceShutdown = shutdownStrategy.forceShutdown(this); if (forceShutdown) { String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange; LOG.debug(msg); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java index c47fc50..37d457c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java @@ -17,7 +17,6 @@ package org.apache.camel.processor; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; @@ -34,6 +33,7 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelInternalProcessorAdvice; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RoutePolicy; +import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.Transformer; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; @@ -75,6 +75,7 @@ public class SharedCamelInternalProcessor { private final CamelContext camelContext; private final ReactiveExecutor reactiveExecutor; private final AsyncProcessorAwaitManager awaitManager; + private final ShutdownStrategy shutdownStrategy; private final List<CamelInternalProcessorAdvice> advices; private byte statefulAdvices; @@ -82,6 +83,7 @@ public class SharedCamelInternalProcessor { this.camelContext = camelContext; this.reactiveExecutor = camelContext.getReactiveExecutor(); this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); + this.shutdownStrategy = camelContext.getShutdownStrategy(); if (advices != null) { this.advices = new ArrayList<>(advices.length); @@ -304,7 +306,7 @@ public class SharedCamelInternalProcessor { // determine if we can still run, or the camel context is forcing a shutdown if (processor instanceof Service) { - boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown((Service) processor); + boolean forceShutdown = shutdownStrategy.forceShutdown((Service) processor); if (forceShutdown) { String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange; LOG.debug(msg); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 2fcaae2..32d61bc 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -42,6 +42,7 @@ import org.apache.camel.spi.ExchangeFormatter; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.ShutdownPrepared; +import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -73,6 +74,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme protected final CamelContext camelContext; protected final ReactiveExecutor reactiveExecutor; protected final AsyncProcessorAwaitManager awaitManager; + protected final ShutdownStrategy shutdownStrategy; protected final Processor deadLetter; protected final String deadLetterUri; protected final boolean deadLetterHandleNewException; @@ -102,6 +104,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme this.camelContext = camelContext; this.reactiveExecutor = camelContext.getReactiveExecutor(); this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); + this.shutdownStrategy = camelContext.getShutdownStrategy(); this.redeliveryProcessor = redeliveryProcessor; this.deadLetter = deadLetter; this.output = output; @@ -494,7 +497,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme protected boolean isRunAllowed() { // if camel context is forcing a shutdown then do not allow running - boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(RedeliveryErrorHandler.this); + boolean forceShutdown = shutdownStrategy.forceShutdown(RedeliveryErrorHandler.this); if (forceShutdown) { LOG.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)"); return false;