This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit ceaffab948b9ab441b984a1fabaaf3731ba0996e Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Tue Nov 22 18:28:49 2022 +0100 CAMEL-15105: rework redelivery handling --- .../microprofile/faulttolerance/FaultToleranceProcessor.java | 2 +- .../org/apache/camel/component/resilience4j/ResilienceProcessor.java | 3 +-- .../java/org/apache/camel/impl/engine/CamelInternalProcessor.java | 4 ++-- .../src/main/java/org/apache/camel/processor/CatchProcessor.java | 4 ++-- .../src/main/java/org/apache/camel/processor/MulticastProcessor.java | 2 +- .../main/java/org/apache/camel/processor/OnCompletionProcessor.java | 2 +- .../src/main/java/org/apache/camel/processor/PollEnricher.java | 2 +- .../org/apache/camel/processor/aggregate/AggregateProcessor.java | 5 ++--- .../java/org/apache/camel/processor/errorhandler/NoErrorHandler.java | 3 +-- .../apache/camel/support/BridgeExceptionHandlerToErrorHandler.java | 3 +-- 10 files changed, 13 insertions(+), 17 deletions(-) diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java index 6c148e068ea..30b8cc538d7 100644 --- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java +++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -548,7 +548,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport exchange.setException(null); // and we should not be regarded as exhausted as we are in a try .. // catch block - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + exchange.getExchangeExtension().setRedeliveryExhausted(false); // run the fallback processor try { LOG.debug("Running fallback: {} with exchange: {}", fallbackProcessor, exchange); diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 83234a6b165..97a8aa9a17e 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -41,7 +41,6 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Route; @@ -683,7 +682,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport exchange.setException(null); // and we should not be regarded as exhausted as we are in a try .. // catch block - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + exchange.getExchangeExtension().setRedeliveryExhausted(false); // run the fallback processor try { LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index ff2e6c51067..05e7b3d1d17 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -961,7 +961,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In exchange.setException(tce); // because this is stream caching error then we cannot use redelivery as the message body is corrupt // so mark as redelivery exhausted - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true); + exchange.getExchangeExtension().setRedeliveryExhausted(true); } // check if we somewhere failed due to a stream caching exception Throwable cause = exchange.getException(); @@ -987,7 +987,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In exchange.setException(tce); // because this is stream caching error then we cannot use redelivery as the message body is corrupt // so mark as redelivery exhausted - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true); + exchange.getExchangeExtension().setRedeliveryExhausted(true); } } return null; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java index b13fc20f890..64be8c6814a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -115,7 +115,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, e); exchange.setException(null); // and we should not be regarded as exhausted as we are in a try .. catch block - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + exchange.getExchangeExtension().setRedeliveryExhausted(false); if (LOG.isDebugEnabled()) { LOG.debug("The exception is handled for the exception: {} caused by: {}", @@ -131,7 +131,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, false, null); // always clear redelivery exhausted in a catch clause - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + exchange.getExchangeExtension().setRedeliveryExhausted(false); if (rollbackOnly || rollbackOnlyLast || stop) { exchange.setRouteStop(stop); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index af740f9b1c2..d4602468844 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -827,7 +827,7 @@ public class MulticastProcessor extends AsyncProcessorSupport // multicast uses error handling on its output processors and they have tried to redeliver // so we shall signal back to the other error handlers that we are exhausted and they should not // also try to redeliver as we would then do that twice - original.adapt(ExtendedExchange.class).setRedeliveryExhausted(exhaust); + original.getExchangeExtension().setRedeliveryExhausted(exhaust); } reactiveExecutor.schedule(callback); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index 58c708c29cc..fc7da959e16 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -177,7 +177,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac boolean rollbackOnlyLast = ee.isRollbackOnlyLast(); ee.setRollbackOnlyLast(false); // and we should not be regarded as exhausted as we are in a onCompletion block - boolean exhausted = ee.adapt(ExtendedExchange.class).isRedeliveryExhausted(); + boolean exhausted = ee.getExchangeExtension().isRedeliveryExhausted(); ee.setRedeliveryExhausted(false); Exception cause = ee.getException(); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index 522a53f6944..4a1bd3605c3 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -322,7 +322,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // restore caused exception exchange.setException(cause); // remove the exhausted marker as we want to be able to perform redeliveries with the error handler - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + exchange.getExchangeExtension().setRedeliveryExhausted(false); // preserve the redelivery stats if (redeliveried != null) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index b7a4e9a7a6d..b96e38ae395 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -42,7 +42,6 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Predicate; @@ -1435,8 +1434,8 @@ public class AggregateProcessor extends AsyncProcessorSupport // set redelivery counter exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); // and prepare for sending to DLC - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); - exchange.adapt(ExtendedExchange.class).setRollbackOnly(false); + exchange.getExchangeExtension().setRedeliveryExhausted(false); + exchange.setRollbackOnly(false); deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange); } catch (Throwable e) { exchange.setException(e); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java index 3a74e89c82e..59d00a72fd8 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/NoErrorHandler.java @@ -21,7 +21,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.ErrorHandler; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; @@ -47,7 +46,7 @@ public class NoErrorHandler extends ErrorHandlerSupport implements AsyncProcesso return output.process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + exchange.getExchangeExtension().setRedeliveryExhausted(false); callback.done(doneSync); } }); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java index 88d6e806453..f56ddfbbe9b 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java @@ -18,7 +18,6 @@ package org.apache.camel.support; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.UnitOfWork; @@ -71,7 +70,7 @@ public class BridgeExceptionHandlerToErrorHandler implements ExceptionHandler { // mark as bridged exchange.setProperty(ExchangePropertyKey.ERRORHANDLER_BRIDGE, true); // and mark as redelivery exhausted as we cannot do redeliveries - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true); + exchange.getExchangeExtension().setRedeliveryExhausted(true); // wrap in UoW UnitOfWork uow = null;