This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7a58163400bf15ba7959f567aa67dc584316dcb6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jan 24 21:07:47 2020 +0100 CAMEL-14438: camel-core - Optimize core for checking for stop routing --- core/camel-api/src/main/java/org/apache/camel/Exchange.java | 13 +++++++++++++ .../org/apache/camel/processor/CamelInternalProcessor.java | 10 +++------- .../org/apache/camel/processor/OnCompletionProcessor.java | 7 +++---- .../src/main/java/org/apache/camel/processor/Pipeline.java | 12 ++++-------- .../java/org/apache/camel/processor/PipelineHelper.java | 12 ++++-------- .../java/org/apache/camel/processor/RestBindingAdvice.java | 12 ++++++------ .../camel/processor/SharedCamelInternalProcessor.java | 10 +++------- .../main/java/org/apache/camel/processor/StopProcessor.java | 2 +- .../main/java/org/apache/camel/processor/TryProcessor.java | 10 +++------- .../processor/errorhandler/RedeliveryErrorHandler.java | 4 ++-- .../test/java/org/apache/camel/processor/RouteStopTest.java | 2 +- .../org/apache/camel/processor/SamplingThrottlerTest.java | 4 ++-- .../main/java/org/apache/camel/support/DefaultExchange.java | 12 ++++++++++++ 13 files changed, 57 insertions(+), 53 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 161932f..00facf3 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -209,6 +209,7 @@ public interface Exchange { String REST_HTTP_QUERY = "CamelRestHttpQuery"; String ROLLBACK_ONLY = "CamelRollbackOnly"; String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast"; + @Deprecated String ROUTE_STOP = "CamelRouteStop"; String REUSE_SCRIPT_ENGINE = "CamelReuseScripteEngine"; @@ -520,6 +521,18 @@ public interface Exchange { boolean isTransacted(); /** + * Returns true if this exchange is marked to stop and not continue routing. + */ + boolean isRouteStop(); + + /** + * Sets whether this exchange is marked to stop and not continue routing. + * + * @param routeStop <tt>true</tt> to stop routing + */ + void setRouteStop(boolean routeStop); + + /** * Returns true if this exchange is an external initiated redelivered message (such as a JMS broker). * <p/> * <b>Important: </b> It is not always possible to determine if the message is a redelivery diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 783fd2e..7577fb5 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -287,13 +287,9 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { * Strategy to determine if we should continue processing the {@link Exchange}. */ private boolean continueProcessing(Exchange exchange) { - Object stop = exchange.getProperty(Exchange.ROUTE_STOP); - if (stop != null) { - boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); - if (doStop) { - LOG.debug("Exchange is marked to stop routing: {}", exchange); - return false; - } + if (exchange.isRouteStop()) { + LOG.debug("Exchange is marked to stop routing: {}", exchange); + return false; } // determine if we can still run, or the camel context is forcing a shutdown diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index 6cdd4f6..1147ecb 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -147,7 +147,8 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac // must remember some properties which we cannot use during onCompletion processing // as otherwise we may cause issues // but keep the caused exception stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange - Object stop = exchange.removeProperty(Exchange.ROUTE_STOP); + boolean stop = exchange.isRouteStop(); + exchange.setRouteStop(false); Object failureHandled = exchange.removeProperty(Exchange.FAILURE_HANDLED); Object errorhandlerHandled = exchange.removeProperty(Exchange.ERRORHANDLER_HANDLED); Object rollbackOnly = exchange.removeProperty(Exchange.ROLLBACK_ONLY); @@ -164,9 +165,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac exchange.setException(e); } finally { // restore the options - if (stop != null) { - exchange.setProperty(Exchange.ROUTE_STOP, stop); - } + exchange.setRouteStop(stop); if (failureHandled != null) { exchange.setProperty(Exchange.FAILURE_HANDLED, failureHandled); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index 350c5e5..0fe1ef6 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -128,15 +128,11 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo } protected boolean continueRouting(List<AsyncProcessor> list, AtomicInteger index, Exchange exchange) { - Object stop = exchange.getProperty(Exchange.ROUTE_STOP); - if (stop != null) { - boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); - if (doStop) { - if (LOG.isDebugEnabled()) { - LOG.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange); - } - return false; + if (exchange.isRouteStop()) { + if (LOG.isDebugEnabled()) { + LOG.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange); } + return false; } // continue if there are more processors to route boolean answer = index.get() < list.size(); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PipelineHelper.java b/core/camel-base/src/main/java/org/apache/camel/processor/PipelineHelper.java index d29deba..8bfacea 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/PipelineHelper.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/PipelineHelper.java @@ -63,15 +63,11 @@ public final class PipelineHelper { } // check for stop - Object stop = exchange.getProperty(Exchange.ROUTE_STOP); - if (stop != null) { - boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, exchange, stop); - if (doStop) { - if (log.isDebugEnabled()) { - log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange); - } - return false; + if (exchange.isRouteStop()) { + if (log.isDebugEnabled()) { + log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange); } + return false; } return true; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RestBindingAdvice.java b/core/camel-base/src/main/java/org/apache/camel/processor/RestBindingAdvice.java index 0e99e3f..945cffd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RestBindingAdvice.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RestBindingAdvice.java @@ -159,7 +159,7 @@ public class RestBindingAdvice implements CamelInternalProcessorAdvice<Map<Strin String method = exchange.getIn().getHeader(Exchange.HTTP_METHOD, String.class); if ("OPTIONS".equalsIgnoreCase(method)) { // for OPTIONS methods then we should not route at all as its part of CORS - exchange.setProperty(Exchange.ROUTE_STOP, true); + exchange.setRouteStop(true); return true; } return false; @@ -211,7 +211,7 @@ public class RestBindingAdvice implements CamelInternalProcessorAdvice<Map<Strin // set empty response body as http error code indicate the problem exchange.getMessage().setBody(null); // stop routing and return - exchange.setProperty(Exchange.ROUTE_STOP, true); + exchange.setRouteStop(true); return; } @@ -223,7 +223,7 @@ public class RestBindingAdvice implements CamelInternalProcessorAdvice<Map<Strin // set empty response body as http error code indicate the problem exchange.getMessage().setBody(null); // stop routing and return - exchange.setProperty(Exchange.ROUTE_STOP, true); + exchange.setRouteStop(true); return; } } @@ -278,7 +278,7 @@ public class RestBindingAdvice implements CamelInternalProcessorAdvice<Map<Strin exchange.getMessage().setHeader(Exchange.HTTP_RESPONSE_CODE, 400); exchange.getMessage().setBody("The request body is missing."); // stop routing and return - exchange.setProperty(Exchange.ROUTE_STOP, true); + exchange.setRouteStop(true); return; } } @@ -287,7 +287,7 @@ public class RestBindingAdvice implements CamelInternalProcessorAdvice<Map<Strin exchange.getMessage().setHeader(Exchange.HTTP_RESPONSE_CODE, 400); exchange.getMessage().setBody("Some of the required query parameters are missing."); // stop routing and return - exchange.setProperty(Exchange.ROUTE_STOP, true); + exchange.setRouteStop(true); return; } if (requiredHeaders != null && !exchange.getIn().getHeaders().keySet().containsAll(requiredHeaders)) { @@ -295,7 +295,7 @@ public class RestBindingAdvice implements CamelInternalProcessorAdvice<Map<Strin exchange.getMessage().setHeader(Exchange.HTTP_RESPONSE_CODE, 400); exchange.getMessage().setBody("Some of the required HTTP headers are missing."); // stop routing and return - exchange.setProperty(Exchange.ROUTE_STOP, true); + exchange.setRouteStop(true); return; } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java index 5e23568..73baa86 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java @@ -295,13 +295,9 @@ public class SharedCamelInternalProcessor { * Strategy to determine if we should continue processing the {@link Exchange}. */ protected boolean continueProcessing(Exchange exchange, AsyncProcessor processor) { - Object stop = exchange.getProperty(Exchange.ROUTE_STOP); - if (stop != null) { - boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); - if (doStop) { - LOG.debug("Exchange is marked to stop routing: {}", exchange); - return false; - } + if (exchange.isRouteStop()) { + LOG.debug("Exchange is marked to stop routing: {}", exchange); + return false; } // determine if we can still run, or the camel context is forcing a shutdown diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java index 5c99fc8..975317b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java @@ -33,7 +33,7 @@ public class StopProcessor extends AsyncProcessorSupport implements IdAware, Rou @Override public boolean process(Exchange exchange, AsyncCallback callback) { // mark the exchange to stop continue routing - exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); + exchange.setRouteStop(true); callback.done(true); return true; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java index cd08f60..c13baaa 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -119,13 +119,9 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc } protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) { - Object stop = exchange.getProperty(Exchange.ROUTE_STOP); - if (stop != null) { - boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); - if (doStop) { - LOG.debug("Exchange is marked to stop routing: {}", exchange); - return false; - } + if (exchange.isRouteStop()) { + LOG.debug("Exchange is marked to stop routing: {}", exchange); + return false; } // continue if there are more processors to route diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index e9bb3bb..f58c05c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -300,7 +300,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme if (ExchangeHelper.isInterrupted(exchange)) { // mark the exchange to stop continue routing when interrupted // as we do not want to continue routing (for example a task has been cancelled) - exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); + exchange.setRouteStop(true); answer = true; } @@ -472,7 +472,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme exchange.setException(e); // mark the exchange to stop continue routing when interrupted // as we do not want to continue routing (for example a task has been cancelled) - exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); + exchange.setRouteStop(true); reactiveExecutor.schedule(callback); } } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RouteStopTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RouteStopTest.java index 871d66a..bcd1680 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/RouteStopTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RouteStopTest.java @@ -83,7 +83,7 @@ public class RouteStopTest extends ContextTestSupport { .to("mock:result"); // END SNIPPET: e1 - from("direct:foo").to("mock:foo").setProperty(Exchange.ROUTE_STOP, constant("true")).to("mock:result"); + from("direct:foo").to("mock:foo").process(e -> e.setRouteStop(true)).to("mock:result"); } }; } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java index 2591447..3251e16 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java @@ -146,8 +146,8 @@ public class SamplingThrottlerTest extends ContextTestSupport { private void validateDroppedExchanges(List<Exchange> sentExchanges, int expectedNotDroppedCount) { int notDropped = 0; for (Exchange e : sentExchanges) { - Boolean stopped = e.getProperty(Exchange.ROUTE_STOP, Boolean.class); - if (stopped == null) { + boolean stopped = e.isRouteStop(); + if (!stopped) { notDropped++; } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index ac78c4d..1a62320 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -57,6 +57,7 @@ public final class DefaultExchange implements ExtendedExchange { private Boolean externalRedelivered; private String historyNodeId; private String historyNodeLabel; + private boolean routeStop; public DefaultExchange(CamelContext context) { this(context, ExchangePattern.InOnly); @@ -119,6 +120,7 @@ public final class DefaultExchange implements ExtendedExchange { } exchange.setException(getException()); + exchange.setRouteStop(isRouteStop()); // copy properties after body as body may trigger lazy init if (hasProperties()) { @@ -467,6 +469,16 @@ public final class DefaultExchange implements ExtendedExchange { } @Override + public boolean isRouteStop() { + return routeStop; + } + + @Override + public void setRouteStop(boolean routeStop) { + this.routeStop = routeStop; + } + + @Override public boolean isExternalRedelivered() { if (externalRedelivered == null) { // lets avoid adding methods to the Message API, so we use the