This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 57d5f57077add6e71884e26e08c1fea9528d3bd7 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Wed Nov 23 09:11:11 2022 +0100 CAMEL-15105: rework handling the unit of work --- .../src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java | 3 +-- .../org/apache/camel/component/jetty/CamelContinuationServlet.java | 4 +--- .../microprofile/faulttolerance/FaultToleranceProcessor.java | 2 +- .../apache/camel/component/resilience4j/ResilienceProcessor.java | 2 +- .../java/org/apache/camel/impl/engine/CamelInternalProcessor.java | 6 ++---- .../main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java | 3 +-- .../org/apache/camel/converter/stream/CachedOutputStreamTest.java | 3 +-- .../java/org/apache/camel/support/EventDrivenPollingConsumer.java | 2 +- 8 files changed, 9 insertions(+), 16 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 5c909c56021..ba7f5dfc9b2 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -30,7 +30,6 @@ import javax.security.auth.login.Configuration; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -251,7 +250,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) { // do not share unit of work - exchange.adapt(ExtendedExchange.class).setUnitOfWork(null); + exchange.getExchangeExtension().setUnitOfWork(null); exchange.setProperty(ExchangePropertyKey.SPLIT_INDEX, index); diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java index c4fece46b23..82f1f428de6 100644 --- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java +++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java @@ -33,7 +33,6 @@ import jakarta.servlet.http.HttpServletResponse; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.http.common.CamelServlet; import org.apache.camel.http.common.HttpCommonEndpoint; @@ -239,8 +238,7 @@ public class CamelContinuationServlet extends CamelServlet { } } else if (uow.onPrepare(exchange)) { // need to re-attach uow - ExtendedExchange ee = (ExtendedExchange) exchange; - ee.setUnitOfWork(uow); + exchange.getExchangeExtension().setUnitOfWork(uow); } ClassLoader oldTccl = overrideTccl(exchange); diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java index 8ccaca4bab2..6c148e068ea 100644 --- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java +++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -445,7 +445,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport } else { // prepare uow on copy uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy); - copy.adapt(ExtendedExchange.class).setUnitOfWork(uow); + copy.getExchangeExtension().setUnitOfWork(uow); // the copy must be starting from the route where its copied from Route route = ExchangeHelper.getRoute(exchange); if (route != null) { diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 04d755d1e07..83234a6b165 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -526,7 +526,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport } else { // prepare uow on copy uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy); - copy.adapt(ExtendedExchange.class).setUnitOfWork(uow); + copy.getExchangeExtension().setUnitOfWork(uow); // the copy must be starting from the route where its copied from Route route = ExchangeHelper.getRoute(exchange); if (route != null) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 7f9630868bf..ff2e6c51067 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -782,15 +782,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // If there is no existing UoW, then we should start one and // terminate it once processing is completed for the exchange. created = createUnitOfWork(exchange); - ExtendedExchange ee = (ExtendedExchange) exchange; - ee.setUnitOfWork(created); + exchange.getExchangeExtension().setUnitOfWork(created); uow = created; } else { // reuse existing exchange if (uow.onPrepare(exchange)) { // need to re-attach uow - ExtendedExchange ee = (ExtendedExchange) exchange; - ee.setUnitOfWork(uow); + exchange.getExchangeExtension().setUnitOfWork(uow); // we are prepared for reuse and can regard it as-if we created the unit of work // so the after method knows that this is the outer bounds and should done the unit of work created = uow; diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 5ddfd782329..3c6cb7af9d6 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -275,8 +275,7 @@ public class DefaultUnitOfWork implements UnitOfWork { protected void onDone() { // MUST clear and set uow to null on exchange after done // in case the same exchange is manually reused by Camel end users (should happen seldom) - ExtendedExchange ee = (ExtendedExchange) exchange; - ee.setUnitOfWork(null); + exchange.getExchangeExtension().setUnitOfWork(null); } @Override diff --git a/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java index 916d9c49e46..0b13f65e153 100644 --- a/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java @@ -28,7 +28,6 @@ import java.util.StringJoiner; import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.StreamCache; import org.apache.camel.converter.IOConverter; import org.apache.camel.impl.engine.DefaultUnitOfWork; @@ -66,7 +65,7 @@ public class CachedOutputStreamTest extends ContextTestSupport { exchange = new DefaultExchange(context); UnitOfWork uow = new DefaultUnitOfWork(exchange); - exchange.adapt(ExtendedExchange.class).setUnitOfWork(uow); + exchange.getExchangeExtension().setUnitOfWork(uow); } @Override diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java index 620b98fbe52..55ab8afbb6e 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java @@ -213,7 +213,7 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement // we want the copy to have an uow UnitOfWork uow = getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory() .createUnitOfWork(copy); - copy.adapt(ExtendedExchange.class).setUnitOfWork(uow); + copy.getExchangeExtension().setUnitOfWork(uow); return copy; }