This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7d7e357ddba9913ee5e2958b842ba7e754a1b461 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Jan 25 17:41:58 2020 +0100 CAMEL-14354: camel-core - Optimize core for checking rollback --- .../cdi/transaction/TransactionErrorHandler.java | 5 +++-- .../camel/spring/spi/TransactionErrorHandler.java | 5 +++-- .../src/main/java/org/apache/camel/Exchange.java | 17 ++++++++++++++++ .../camel/processor/OnCompletionProcessor.java | 14 ++++++------- .../org/apache/camel/processor/PipelineHelper.java | 4 ++-- .../apache/camel/processor/RollbackProcessor.java | 4 ++-- .../ShareUnitOfWorkAggregationStrategy.java | 2 +- .../aggregate/UseLatestAggregationStrategy.java | 2 +- .../errorhandler/RedeliveryErrorHandler.java | 14 ++++++------- .../org/apache/camel/support/DefaultExchange.java | 23 ++++++++++++++++++++-- .../modules/ROOT/pages/camel-3x-upgrade-guide.adoc | 20 +++++++++++++++++-- 11 files changed, 81 insertions(+), 29 deletions(-) diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java index 5d5326b..f8813c9 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java @@ -157,8 +157,9 @@ public class TransactionErrorHandler extends ErrorHandlerSupport // if it was a local rollback only then remove its marker so outer // transaction wont see the marker - Boolean onlyLast = (Boolean) exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST); - if (onlyLast != null && onlyLast) { + boolean onlyLast = exchange.isRollbackOnlyLast(); + exchange.setRollbackOnlyLast(false); + if (onlyLast) { // we only want this logged at debug level if (LOG.isDebugEnabled()) { // log exception if there was a cause exception so we have the diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java index 6e3f20e..667ce74 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java @@ -157,8 +157,9 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler { } // if it was a local rollback only then remove its marker so outer transaction wont see the marker - Boolean onlyLast = (Boolean) exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST); - if (onlyLast != null && onlyLast) { + boolean onlyLast = exchange.isRollbackOnlyLast(); + exchange.setRollbackOnlyLast(false); + if (onlyLast) { // we only want this logged at debug level if (LOG.isDebugEnabled()) { // log exception if there was a cause exception so we have the stack trace diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 00facf3..4bb713e 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -207,7 +207,9 @@ public interface Exchange { String REDELIVERY_DELAY = "CamelRedeliveryDelay"; String REST_HTTP_URI = "CamelRestHttpUri"; String REST_HTTP_QUERY = "CamelRestHttpQuery"; + @Deprecated String ROLLBACK_ONLY = "CamelRollbackOnly"; + @Deprecated String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast"; @Deprecated String ROUTE_STOP = "CamelRouteStop"; @@ -549,6 +551,21 @@ public interface Exchange { boolean isRollbackOnly(); /** + * Sets whether to mark this exchange for rollback + */ + void setRollbackOnly(boolean rollbackOnly); + + /** + * Returns true if this exchange is marked for rollback (only last transaction section) + */ + boolean isRollbackOnlyLast(); + + /** + * Sets whether to mark this exchange for rollback (only last transaction section) + */ + void setRollbackOnlyLast(boolean rollbackOnlyLast); + + /** * Returns the container so that a processor can resolve endpoints from URIs * * @return the container which owns this exchange diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index 1147ecb..0484619 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -151,8 +151,10 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac exchange.setRouteStop(false); Object failureHandled = exchange.removeProperty(Exchange.FAILURE_HANDLED); Object errorhandlerHandled = exchange.removeProperty(Exchange.ERRORHANDLER_HANDLED); - Object rollbackOnly = exchange.removeProperty(Exchange.ROLLBACK_ONLY); - Object rollbackOnlyLast = exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST); + boolean rollbackOnly = exchange.isRollbackOnly(); + exchange.setRollbackOnly(false); + boolean rollbackOnlyLast = exchange.isRollbackOnlyLast(); + exchange.setRollbackOnlyLast(false); // and we should not be regarded as exhausted as we are in a onCompletion block Object exhausted = exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); @@ -172,12 +174,8 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac if (errorhandlerHandled != null) { exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, errorhandlerHandled); } - if (rollbackOnly != null) { - exchange.setProperty(Exchange.ROLLBACK_ONLY, rollbackOnly); - } - if (rollbackOnlyLast != null) { - exchange.setProperty(Exchange.ROLLBACK_ONLY_LAST, rollbackOnlyLast); - } + exchange.setRollbackOnly(rollbackOnly); + exchange.setRollbackOnlyLast(rollbackOnlyLast); if (exhausted != null) { exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhausted); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PipelineHelper.java b/core/camel-base/src/main/java/org/apache/camel/processor/PipelineHelper.java index 8bfacea..6d7308e 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/PipelineHelper.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/PipelineHelper.java @@ -41,13 +41,13 @@ public final class PipelineHelper { public static boolean continueProcessing(Exchange exchange, String message, Logger log) { // check for error if so we should break out boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(exchange); - if (exchange.isFailed() || exchange.isRollbackOnly() || exceptionHandled) { + if (exchange.isFailed() || exchange.isRollbackOnly() || exchange.isRollbackOnlyLast() || exceptionHandled) { // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory handling was done // by the error handler. It's still an exception, the exchange still failed. if (log.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Message exchange has failed: ").append(message).append(" for exchange: ").append(exchange); - if (exchange.isRollbackOnly()) { + if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) { sb.append(" Marked as rollback only."); } if (exchange.getException() != null) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java index d6ed446..af30e1b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java @@ -47,10 +47,10 @@ public class RollbackProcessor extends AsyncProcessorSupport implements Traceabl if (isMarkRollbackOnlyLast()) { // only mark the last route (current) as rollback // this is needed when you have multiple transactions in play - exchange.setProperty(Exchange.ROLLBACK_ONLY_LAST, Boolean.TRUE); + exchange.setRollbackOnlyLast(true); } else { // default to mark the entire route as rollback - exchange.setProperty(Exchange.ROLLBACK_ONLY, Boolean.TRUE); + exchange.setRollbackOnly(true); } if (markRollbackOnly || markRollbackOnlyLast) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java index b367070..b72954a 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java @@ -107,7 +107,7 @@ public final class ShareUnitOfWorkAggregationStrategy extends ServiceSupport imp protected void propagateFailure(Exchange answer, Exchange newExchange) { // if new exchange failed then propagate all the error related properties to the answer boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(newExchange); - if (newExchange.isFailed() || newExchange.isRollbackOnly() || exceptionHandled) { + if (newExchange.isFailed() || newExchange.isRollbackOnly() || newExchange.isRollbackOnlyLast() || exceptionHandled) { if (newExchange.getException() != null) { answer.setException(newExchange.getException()); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java index ea71d18..9f14eae 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java @@ -72,7 +72,7 @@ public class UseLatestAggregationStrategy implements AggregationStrategy { // propagate exception from old exchange if there isn't already an exception boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(oldExchange); - if (oldExchange.isFailed() || oldExchange.isRollbackOnly() || exceptionHandled) { + if (oldExchange.isFailed() || oldExchange.isRollbackOnly() || oldExchange.isRollbackOnlyLast() || exceptionHandled) { // propagate failure by using old exchange as the answer return oldExchange; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index f58c05c..c6c7179 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -582,7 +582,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // we continue so clear any exceptions exchange.setException(null); // clear rollback flags - exchange.setProperty(Exchange.ROLLBACK_ONLY, null); + exchange.setRollbackOnly(false); // reset cached streams so they can be read again MessageHelper.resetStreamCache(exchange.getIn()); @@ -613,7 +613,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme exchange.setException(null); // clear rollback flags - exchange.setProperty(Exchange.ROLLBACK_ONLY, null); + exchange.setRollbackOnly(false); // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange // and then put these on the exchange when doing a redelivery / fault processor @@ -783,7 +783,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); // and remove traces of rollback only and uow exhausted markers - exchange.removeProperty(Exchange.ROLLBACK_ONLY); + exchange.setRollbackOnly(false); exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED); handled = true; @@ -981,7 +981,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme return; } - if (!exchange.isRollbackOnly()) { + if (!exchange.isRollbackOnly() && !exchange.isRollbackOnlyLast()) { if (newException && !currentRedeliveryPolicy.isLogNewException()) { // do not log new exception return; @@ -1024,7 +1024,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme LoggingLevel newLogLevel; boolean logStackTrace; - if (exchange.isRollbackOnly()) { + if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) { newLogLevel = currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); logStackTrace = currentRedeliveryPolicy.isLogStackTrace(); } else if (shouldRedeliver) { @@ -1058,7 +1058,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme } else { logger.log(msg, newLogLevel); } - } else if (exchange.isRollbackOnly()) { + } else if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) { String msg = "Rollback " + ExchangeHelper.logIds(exchange); Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); if (cause != null) { @@ -1122,7 +1122,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme } // if marked as rollback only then do not continue/redeliver - boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class); + boolean rollbackOnly = exchange.isRollbackOnly(); if (rollbackOnly) { LOG.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange); return true; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index 1a62320..8beb3da 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -58,6 +58,8 @@ public final class DefaultExchange implements ExtendedExchange { private String historyNodeId; private String historyNodeLabel; private boolean routeStop; + private boolean rollbackOnly; + private boolean rollbackOnlyLast; public DefaultExchange(CamelContext context) { this(context, ExchangePattern.InOnly); @@ -121,6 +123,8 @@ public final class DefaultExchange implements ExtendedExchange { exchange.setException(getException()); exchange.setRouteStop(isRouteStop()); + exchange.setRollbackOnly(isRollbackOnly()); + exchange.setRollbackOnlyLast(isRollbackOnlyLast()); // copy properties after body as body may trigger lazy init if (hasProperties()) { @@ -475,7 +479,7 @@ public final class DefaultExchange implements ExtendedExchange { @Override public void setRouteStop(boolean routeStop) { - this.routeStop = routeStop; + this. routeStop = routeStop; } @Override @@ -498,7 +502,22 @@ public final class DefaultExchange implements ExtendedExchange { @Override public boolean isRollbackOnly() { - return Boolean.TRUE.equals(getProperty(Exchange.ROLLBACK_ONLY)) || Boolean.TRUE.equals(getProperty(Exchange.ROLLBACK_ONLY_LAST)); + return rollbackOnly; + } + + @Override + public void setRollbackOnly(boolean rollbackOnly) { + this.rollbackOnly = rollbackOnly; + } + + @Override + public boolean isRollbackOnlyLast() { + return rollbackOnlyLast; + } + + @Override + public void setRollbackOnlyLast(boolean rollbackOnlyLast) { + this.rollbackOnlyLast = rollbackOnlyLast; } @Override diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide.adoc index 945dac0..d9297488 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide.adoc @@ -191,8 +191,8 @@ The package `org.apache.camel.http.common.cookie` is renamed to `org.apache.came ==== Exchange.ROUTE_STOP -To signal an `Exchange` to stop continue routing has changed from setting the exchange property `Echange.ROUTE_STOP` to true. -Instead you should now use the `setRouteStop` method on the `Exchange` API +To signal an `Exchange` to stop continue routing has changed from setting the exchange property `Exchange.ROUTE_STOP` to true. +Instead you should now use the `setRouteStop` method on the `Exchange` API. [source,java] ---- @@ -205,6 +205,22 @@ Should now be: exchange.setRouteStop(true); ---- +==== Exchange.ROLLBACK_ONLY and Exchange.ROLLBACK_ONLY_LAST + +To signal an `Exchange` to rollback a transaction has changed from setting the exchange property `Exchange.ROLLBACK_ONLY` to true. +Instead you should now use the `setRollbackOnly` method on the `Exchange` API (the same for rollback only last). + +[source,java] +---- + exchange.setProperty(Exchange.ROLLBACK_ONLY, Boolean.TRUE); +---- + +Should now be: + +[source,java] +---- + exchange.setRollbackOnly(true); +---- ==== Java DSL