This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 9075630 CAMEL-14354: camel-core optimize 9075630 is described below commit 9075630f0942e8bf63627bf32020cc230ccbe329 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jan 27 19:40:19 2020 +0100 CAMEL-14354: camel-core optimize --- .../src/main/java/org/apache/camel/Exchange.java | 1 + .../java/org/apache/camel/ExtendedExchange.java | 10 ++++++ .../org/apache/camel/processor/CatchProcessor.java | 5 +-- .../apache/camel/processor/MulticastProcessor.java | 6 ++-- .../camel/processor/OnCompletionProcessor.java | 41 +++++++++++----------- .../org/apache/camel/processor/PollEnricher.java | 2 +- .../processor/aggregate/AggregateProcessor.java | 2 +- .../errorhandler/RedeliveryErrorHandler.java | 14 ++++---- .../camel/builder/NoErrorHandlerBuilder.java | 3 +- .../errorhandler/NoErrorHandlerReifier.java | 3 +- .../BridgeExceptionHandlerToErrorHandler.java | 3 +- .../org/apache/camel/support/DefaultExchange.java | 22 +++++++++--- .../org/apache/camel/support/ExchangeHelper.java | 10 ------ 13 files changed, 72 insertions(+), 50 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 364722f..5d6991d 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -205,6 +205,7 @@ public interface Exchange { String REDELIVERED = "CamelRedelivered"; String REDELIVERY_COUNTER = "CamelRedeliveryCounter"; String REDELIVERY_MAX_COUNTER = "CamelRedeliveryMaxCounter"; + @Deprecated String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted"; String REDELIVERY_DELAY = "CamelRedeliveryDelay"; String REST_HTTP_URI = "CamelRestHttpUri"; diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index e3681e9..582102c 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -118,4 +118,14 @@ public interface ExtendedExchange extends Exchange { */ void setInterrupted(boolean interrupted); + /** + * Whether the exchange has exhausted (attempted all) its redeliveries and still failed. + */ + boolean isRedeliveryExhausted(); + + /** + * Used to signal that this exchange has exhausted (attempted all) its redeliveries and still failed. + */ + void setRedeliveryExhausted(boolean redeliveryExhausted); + } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java index b766fb9..b9b3329 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; @@ -102,7 +103,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); exchange.setException(null); // and we should not be regarded as exhausted as we are in a try .. catch block - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); if (LOG.isDebugEnabled()) { LOG.debug("The exception is handled for the exception: {} caused by: {}", @@ -118,7 +119,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, false, null); // always clear redelivery exhausted in a catch clause - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); if (!doneSync) { // signal callback to continue routing async diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index e9bb034..95af8ad 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -44,6 +44,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -528,7 +529,8 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat // also we would need to know if any error handler has attempted redelivery and exhausted boolean stoppedOnException = false; boolean exception = false; - boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange)); + ExtendedExchange see = (ExtendedExchange) subExchange; + boolean exhaust = forceExhaust || see != null && (see.getException() != null || see.isRedeliveryExhausted()); if (original.getException() != null || subExchange != null && subExchange.getException() != null) { // there was an exception and we stopped stoppedOnException = isStopOnException(); @@ -553,7 +555,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat // multicast uses error handling on its output processors and they have tried to redeliver // so we shall signal back to the other error handlers that we are exhausted and they should not // also try to redeliver as we would then do that twice - original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); + original.adapt(ExtendedExchange.class).setRedeliveryExhausted(exhaust); } reactiveExecutor.schedule(callback); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index 0484619..0b812f3 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Ordered; import org.apache.camel.Predicate; @@ -144,22 +145,24 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac * @param exchange the exchange */ protected static void doProcess(Processor processor, Exchange exchange) { + ExtendedExchange ee = (ExtendedExchange) exchange; // must remember some properties which we cannot use during onCompletion processing // as otherwise we may cause issues // but keep the caused exception stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange - boolean stop = exchange.isRouteStop(); - exchange.setRouteStop(false); - Object failureHandled = exchange.removeProperty(Exchange.FAILURE_HANDLED); - Object errorhandlerHandled = exchange.removeProperty(Exchange.ERRORHANDLER_HANDLED); - boolean rollbackOnly = exchange.isRollbackOnly(); - exchange.setRollbackOnly(false); - boolean rollbackOnlyLast = exchange.isRollbackOnlyLast(); - exchange.setRollbackOnlyLast(false); + boolean stop = ee.isRouteStop(); + ee.setRouteStop(false); + Object failureHandled = ee.removeProperty(Exchange.FAILURE_HANDLED); + Object errorhandlerHandled = ee.removeProperty(Exchange.ERRORHANDLER_HANDLED); + boolean rollbackOnly = ee.isRollbackOnly(); + ee.setRollbackOnly(false); + boolean rollbackOnlyLast = ee.isRollbackOnlyLast(); + ee.setRollbackOnlyLast(false); // and we should not be regarded as exhausted as we are in a onCompletion block - Object exhausted = exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + boolean exhausted = ee.adapt(ExtendedExchange.class).isRedeliveryExhausted(); + ee.setRedeliveryExhausted(false); - Exception cause = exchange.getException(); - exchange.setException(null); + Exception cause = ee.getException(); + ee.setException(null); try { processor.process(exchange); @@ -167,20 +170,18 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac exchange.setException(e); } finally { // restore the options - exchange.setRouteStop(stop); + ee.setRouteStop(stop); if (failureHandled != null) { - exchange.setProperty(Exchange.FAILURE_HANDLED, failureHandled); + ee.setProperty(Exchange.FAILURE_HANDLED, failureHandled); } if (errorhandlerHandled != null) { - exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, errorhandlerHandled); - } - exchange.setRollbackOnly(rollbackOnly); - exchange.setRollbackOnlyLast(rollbackOnlyLast); - if (exhausted != null) { - exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhausted); + ee.setProperty(Exchange.ERRORHANDLER_HANDLED, errorhandlerHandled); } + ee.setRollbackOnly(rollbackOnly); + ee.setRollbackOnlyLast(rollbackOnlyLast); + ee.setRedeliveryExhausted(exhausted); if (cause != null) { - exchange.setException(cause); + ee.setException(cause); } } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java index 1eeb601..44e8757 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -306,7 +306,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // restore caused exception exchange.setException(cause); // remove the exhausted marker as we want to be able to perform redeliveries with the error handler - exchange.removeProperties(Exchange.REDELIVERY_EXHAUSTED); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); // preserve the redelivery stats if (redeliveried != null) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 503e2c7..3146223 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -1381,7 +1381,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat try { // set redelivery counter exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); - exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true); deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange); } catch (Throwable e) { exchange.setException(e); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 8336458..073d292 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -268,7 +268,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme * Strategy to determine if the exchange is done so we can continue */ protected boolean isDone(Exchange exchange) { - if (((ExtendedExchange) exchange).isInterrupted()) { + ExtendedExchange ee = (ExtendedExchange) exchange; + if (ee.isInterrupted()) { // mark the exchange to stop continue routing when interrupted // as we do not want to continue routing (for example a task has been cancelled) if (LOG.isTraceEnabled()) { @@ -283,7 +284,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // or we are exhausted boolean answer = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange) - || ExchangeHelper.isRedeliveryExhausted(exchange); + || ee.isRedeliveryExhausted(); if (LOG.isTraceEnabled()) { LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); @@ -442,7 +443,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // the task was rejected exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping")); // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange - exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true); // jump to start of loop which then detects that we are failed and exhausted reactiveExecutor.schedule(this); } else { @@ -766,7 +767,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme exchange.getIn().removeHeader(Exchange.REDELIVERED); exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); // and remove traces of rollback only and uow exhausted markers exchange.setRollbackOnly(false); @@ -1104,15 +1105,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust. */ private boolean isExhausted(Exchange exchange) { + ExtendedExchange ee = (ExtendedExchange) exchange; // if marked as rollback only then do not continue/redeliver - boolean exhausted = ExchangeHelper.isRedeliveryExhausted(exchange); + boolean exhausted = ee.isRedeliveryExhausted(); if (exhausted) { LOG.trace("This exchange is marked as redelivery exhausted: {}", exchange); return true; } // if marked as rollback only then do not continue/redeliver - boolean rollbackOnly = exchange.isRollbackOnly(); + boolean rollbackOnly = ee.isRollbackOnly(); if (rollbackOnly) { LOG.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange); return true; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java index 6ccd34a..3565d35 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java @@ -18,6 +18,7 @@ package org.apache.camel.builder; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.RouteContext; import org.apache.camel.support.processor.DelegateAsyncProcessor; @@ -40,7 +41,7 @@ public class NoErrorHandlerBuilder extends ErrorHandlerBuilderSupport { return super.process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); callback.done(doneSync); } }); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/NoErrorHandlerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/NoErrorHandlerReifier.java index 3beae5f..118297b 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/NoErrorHandlerReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/NoErrorHandlerReifier.java @@ -19,6 +19,7 @@ package org.apache.camel.reifier.errorhandler; import org.apache.camel.AsyncCallback; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.NoErrorHandlerBuilder; import org.apache.camel.spi.RouteContext; @@ -38,7 +39,7 @@ public class NoErrorHandlerReifier extends ErrorHandlerReifier<NoErrorHandlerBui return super.process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); callback.done(doneSync); } }); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java index 5ca068a..4e027e6 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java @@ -17,6 +17,7 @@ package org.apache.camel.support; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.UnitOfWork; @@ -69,7 +70,7 @@ public class BridgeExceptionHandlerToErrorHandler implements ExceptionHandler { // and the message exchange.getIn().setBody(message); // and mark as redelivery exhausted as we cannot do redeliveries - exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true); // wrap in UoW UnitOfWork uow = null; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index 3961ec7..e46b54c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -63,6 +63,7 @@ public final class DefaultExchange implements ExtendedExchange { private boolean rollbackOnlyLast; private boolean notifyEvent; private boolean interrupted; + private boolean redeliveryExhausted; public DefaultExchange(CamelContext context) { this(context, ExchangePattern.InOnly); @@ -124,11 +125,12 @@ public final class DefaultExchange implements ExtendedExchange { } } - exchange.setException(getException()); - exchange.setRouteStop(isRouteStop()); - exchange.setRollbackOnly(isRollbackOnly()); - exchange.setRollbackOnlyLast(isRollbackOnlyLast()); - exchange.setNotifyEvent(isNotifyEvent()); + exchange.setException(exception); + exchange.setRouteStop(routeStop); + exchange.setRollbackOnly(rollbackOnly); + exchange.setRollbackOnlyLast(rollbackOnlyLast); + exchange.setNotifyEvent(notifyEvent); + exchange.setRedeliveryExhausted(redeliveryExhausted); // copy properties after body as body may trigger lazy init if (hasProperties()) { @@ -639,6 +641,16 @@ public final class DefaultExchange implements ExtendedExchange { this.interrupted = interrupted; } + @Override + public boolean isRedeliveryExhausted() { + return redeliveryExhausted; + } + + @Override + public void setRedeliveryExhausted(boolean redeliveryExhausted) { + this.redeliveryExhausted = redeliveryExhausted; + } + /** * Configures the message after it has been set on the exchange */ diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 26e91be..6bf16ff 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -585,16 +585,6 @@ public final class ExchangeHelper { } /** - * Checks whether the exchange is redelivery exhausted - * - * @param exchange the exchange - * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise - */ - public static boolean isRedeliveryExhausted(Exchange exchange) { - return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); - } - - /** * Checks whether the exchange {@link UnitOfWork} is redelivered * * @param exchange the exchange