This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5e7f751fccb9da090f38f97cf623a753d9b87f2c Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Tue Nov 22 18:29:17 2022 +0100 CAMEL-15105: adapt the RedeliveryErrorHandler to the new extension interface --- .../errorhandler/RedeliveryErrorHandler.java | 66 ++++++++++------------ 1 file changed, 29 insertions(+), 37 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 407291b81bb..01335cb8acd 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -317,8 +317,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport * Strategy to determine if the exchange is done so we can continue */ protected boolean isDone(Exchange exchange) { - ExtendedExchange ee = (ExtendedExchange) exchange; - if (ee.isInterrupted()) { + if (exchange.getExchangeExtension().isInterrupted()) { // mark the exchange to stop continue routing when interrupted // as we do not want to continue routing (for example a task has been cancelled) if (LOG.isTraceEnabled()) { @@ -333,7 +332,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // or we are exhausted boolean answer = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange) - || ee.isRedeliveryExhausted(); + || exchange.getExchangeExtension().isRedeliveryExhausted(); if (LOG.isTraceEnabled()) { LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); @@ -386,7 +385,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport * Simple task to perform calling the processor with no redelivery support */ protected class SimpleTask implements PooledExchangeTask, Runnable, AsyncCallback { - private ExtendedExchange exchange; + private Exchange exchange; private AsyncCallback callback; private boolean first; @@ -394,7 +393,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } public void prepare(Exchange exchange, AsyncCallback callback) { - this.exchange = (ExtendedExchange) exchange; + this.exchange = exchange; this.callback = callback; this.first = true; } @@ -439,7 +438,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport cb.done(false); return; } - if (exchange.isInterrupted()) { + if (exchange.getExchangeExtension().isInterrupted()) { // mark the exchange to stop continue routing when interrupted // as we do not want to continue routing (for example a task has been cancelled) if (LOG.isTraceEnabled()) { @@ -457,7 +456,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // and it has not been handled by the failure processor before // or not exhausted boolean failure = exchange.getException() != null - && !exchange.isRedeliveryExhausted() + && !exchange.getExchangeExtension().isRedeliveryExhausted() && !ExchangeHelper.isFailureHandled(exchange); // error handled bridged boolean bridge = ExchangeHelper.isErrorHandlerBridge(exchange); @@ -533,15 +532,13 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } protected void prepareExchangeAfterFailure(final Exchange exchange) { - ExtendedExchange ee = (ExtendedExchange) exchange; - // we could not process the exchange so we let the failure processor handled it ExchangeHelper.setFailureHandled(exchange); // honor if already set a handling - boolean alreadySet = ee.isErrorHandlerHandledSet(); + boolean alreadySet = exchange.getExchangeExtension().isErrorHandlerHandledSet(); if (alreadySet) { - boolean handled = ee.isErrorHandlerHandled(); + boolean handled = exchange.getExchangeExtension().isErrorHandlerHandled(); LOG.trace("This exchange has already been marked for handling: {}", handled); if (!handled) { // exception not handled, put exception back in the exchange @@ -558,23 +555,21 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } private void prepareExchangeAfterFailureNotHandled(Exchange exchange) { - ExtendedExchange ee = (ExtendedExchange) exchange; - - LOG.trace("This exchange is not handled or continued so its marked as failed: {}", ee); + LOG.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); // exception not handled, put exception back in the exchange - ee.setErrorHandlerHandled(false); - ee.setException(exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Exception.class)); + exchange.getExchangeExtension().setErrorHandlerHandled(false); + exchange.setException(exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Exception.class)); // and put failure endpoint back as well - ee.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT, ee.getProperty(ExchangePropertyKey.TO_ENDPOINT)); + exchange.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT, exchange.getProperty(ExchangePropertyKey.TO_ENDPOINT)); // and store the route id, so we know in which route we failed - Route rc = ExchangeHelper.getRoute(ee); + Route rc = ExchangeHelper.getRoute(exchange); if (rc != null) { - ee.setProperty(ExchangePropertyKey.FAILURE_ROUTE_ID, rc.getRouteId()); + exchange.setProperty(ExchangePropertyKey.FAILURE_ROUTE_ID, rc.getRouteId()); } // create log message String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); - msg = msg + ". Exhausted after delivery attempt: 1 caught: " + ee.getException(); + msg = msg + ". Exhausted after delivery attempt: 1 caught: " + exchange.getException(); // log that we failed delivery as we are exhausted logFailedDelivery(exchange, msg, null); @@ -648,7 +643,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport protected class RedeliveryTask implements PooledExchangeTask, Runnable { // state private Exchange original; - private ExtendedExchange exchange; + private Exchange exchange; private AsyncCallback callback; private int redeliveryCounter; private long redeliveryDelay; @@ -744,7 +739,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport boolean exhausted = false; if (redeliverAllowed) { // we can redeliver but check if we are exhausted first (optimized to only check when needed) - exhausted = exchange.isRedeliveryExhausted() || exchange.isRollbackOnly(); + exhausted = exchange.getExchangeExtension().isRedeliveryExhausted() || exchange.isRollbackOnly(); if (!exhausted && redeliveryCounter > 0) { // its a potential redelivery so determine if we should redeliver or not redeliverAllowed @@ -793,7 +788,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // the task was rejected exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping")); // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true); + exchange.getExchangeExtension().setRedeliveryExhausted(true); // jump to start of loop which then detects that we are failed and exhausted reactiveExecutor.schedule(this); } else { @@ -1121,7 +1116,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport exchange.getIn().removeHeader(Exchange.REDELIVERED); exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + exchange.getExchangeExtension().setRedeliveryExhausted(false); // and remove traces of rollback only and uow exhausted markers exchange.setRollbackOnly(false); @@ -1276,16 +1271,15 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport final Exchange exchange, final boolean isDeadLetterChannel, final boolean shouldHandle, final boolean shouldContinue) { - ExtendedExchange ee = (ExtendedExchange) exchange; Exception newException = exchange.getException(); // we could not process the exchange so we let the failure processor handled it ExchangeHelper.setFailureHandled(exchange); // honor if already set a handling - boolean alreadySet = ee.isErrorHandlerHandledSet(); + boolean alreadySet = exchange.getExchangeExtension().isErrorHandlerHandledSet(); if (alreadySet) { - boolean handled = ee.isErrorHandlerHandled(); + boolean handled = exchange.getExchangeExtension().isErrorHandlerHandled(); LOG.trace("This exchange has already been marked for handling: {}", handled); if (!handled) { // exception not handled, put exception back in the exchange @@ -1304,7 +1298,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport prepareExchangeForContinue(exchange, isDeadLetterChannel); } else if (shouldHandle) { LOG.trace("This exchange is handled so its marked as not failed: {}", exchange); - ee.setErrorHandlerHandled(true); + exchange.getExchangeExtension().setErrorHandlerHandled(true); } else { // okay the redelivery policy are not explicit set to true, so we should allow to check for some // special situations when using dead letter channel @@ -1331,7 +1325,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport if (handled) { LOG.trace("This exchange is handled so its marked as not failed: {}", exchange); - ee.setErrorHandlerHandled(true); + exchange.getExchangeExtension().setErrorHandlerHandled(true); return; } } @@ -1342,18 +1336,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } private void prepareExchangeAfterFailureNotHandled(Exchange exchange) { - ExtendedExchange ee = (ExtendedExchange) exchange; - - LOG.trace("This exchange is not handled or continued so its marked as failed: {}", ee); + LOG.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); // exception not handled, put exception back in the exchange - ee.setErrorHandlerHandled(false); - ee.setException(exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Exception.class)); + exchange.getExchangeExtension().setErrorHandlerHandled(false); + exchange.setException(exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Exception.class)); // and put failure endpoint back as well - ee.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT, ee.getProperty(ExchangePropertyKey.TO_ENDPOINT)); + exchange.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT, exchange.getProperty(ExchangePropertyKey.TO_ENDPOINT)); // and store the route id so we know in which route we failed - String routeId = ExchangeHelper.getAtRouteId(ee); + String routeId = ExchangeHelper.getAtRouteId(exchange); if (routeId != null) { - ee.setProperty(ExchangePropertyKey.FAILURE_ROUTE_ID, routeId); + exchange.setProperty(ExchangePropertyKey.FAILURE_ROUTE_ID, routeId); } }