This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 5443df2c033 CAMEL-15105: cleaned up handling the unit of work and onCompletions 5443df2c033 is described below commit 5443df2c0333469df47e37a2d50fcef52c1ff41c Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Wed Apr 26 19:20:19 2023 +0200 CAMEL-15105: cleaned up handling the unit of work and onCompletions --- .../org/apache/camel/support/AbstractExchange.java | 68 +--------------------- .../camel/support/DefaultPooledExchange.java | 6 -- .../camel/support/ExtendedExchangeExtension.java | 67 +++++++++++++++++++-- 3 files changed, 64 insertions(+), 77 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java index ba484068884..096f65eb59c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java @@ -66,9 +66,7 @@ class AbstractExchange implements Exchange { Message out; Exception exception; String exchangeId; - UnitOfWork unitOfWork; ExchangePattern pattern; - List<Synchronization> onCompletions; Boolean externalRedelivered; boolean routeStop; boolean rollbackOnly; @@ -92,11 +90,11 @@ class AbstractExchange implements Exchange { this.context = parent.getContext(); this.pattern = parent.getPattern(); this.created = parent.getCreated(); - this.unitOfWork = parent.getUnitOfWork(); privateExtension = new ExtendedExchangeExtension(this); privateExtension.setFromEndpoint(parent.getFromEndpoint()); privateExtension.setFromRouteId(parent.getFromRouteId()); + privateExtension.setUnitOfWork(parent.getUnitOfWork()); } public AbstractExchange(Endpoint fromEndpoint) { @@ -678,69 +676,7 @@ class AbstractExchange implements Exchange { @Override public UnitOfWork getUnitOfWork() { - return unitOfWork; - } - - void setUnitOfWork(UnitOfWork unitOfWork) { - this.unitOfWork = unitOfWork; - if (unitOfWork != null && onCompletions != null) { - // now an unit of work has been assigned so add the on completions - // we might have registered already - for (Synchronization onCompletion : onCompletions) { - unitOfWork.addSynchronization(onCompletion); - } - // cleanup the temporary on completion list as they now have been registered - // on the unit of work - onCompletions.clear(); - onCompletions = null; - } - } - - void addOnCompletion(Synchronization onCompletion) { - if (unitOfWork == null) { - // unit of work not yet registered so we store the on completion temporary - // until the unit of work is assigned to this exchange by the unit of work - if (onCompletions == null) { - onCompletions = new ArrayList<>(); - } - onCompletions.add(onCompletion); - } else { - getUnitOfWork().addSynchronization(onCompletion); - } - } - - boolean containsOnCompletion(Synchronization onCompletion) { - if (unitOfWork != null) { - // if there is an unit of work then the completions is moved there - return unitOfWork.containsSynchronization(onCompletion); - } else { - // check temporary completions if no unit of work yet - return onCompletions != null && onCompletions.contains(onCompletion); - } - } - - void handoverCompletions(Exchange target) { - if (onCompletions != null) { - for (Synchronization onCompletion : onCompletions) { - target.getExchangeExtension().addOnCompletion(onCompletion); - } - // cleanup the temporary on completion list as they have been handed over - onCompletions.clear(); - onCompletions = null; - } else if (unitOfWork != null) { - // let unit of work handover - unitOfWork.handoverSynchronization(target); - } - } - - List<Synchronization> handoverCompletions() { - List<Synchronization> answer = null; - if (onCompletions != null) { - answer = new ArrayList<>(onCompletions); - onCompletions.clear(); - onCompletions = null; - } - return answer; + return privateExtension.getUnitOfWork(); } /** diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java index 21ac295a2b7..958b6b92179 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java @@ -99,15 +99,9 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo out.reset(); this.out = null; } - if (this.unitOfWork != null) { - this.unitOfWork.reset(); - } this.exception = null; // reset pattern to original this.pattern = originalPattern; - if (this.onCompletions != null) { - this.onCompletions.clear(); - } // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again this.externalRedelivered = null; this.routeStop = false; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java index 8080af78e0c..f1f13fb139a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java @@ -17,6 +17,7 @@ package org.apache.camel.support; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -44,6 +45,8 @@ public class ExtendedExchangeExtension implements ExchangeExtension { private boolean interruptable = true; private boolean interrupted; private AsyncCallback defaultConsumerCallback; // optimize (do not reset) + private UnitOfWork unitOfWork; + private List<Synchronization> onCompletions; ExtendedExchangeExtension(AbstractExchange exchange) { this.exchange = exchange; @@ -84,7 +87,16 @@ public class ExtendedExchangeExtension implements ExchangeExtension { @Override public void addOnCompletion(Synchronization onCompletion) { - this.exchange.addOnCompletion(onCompletion); + if (unitOfWork == null) { + // unit of work not yet registered so we store the on completion temporary + // until the unit of work is assigned to this exchange by the unit of work + if (onCompletions == null) { + onCompletions = new ArrayList<>(); + } + onCompletions.add(onCompletion); + } else { + unitOfWork.addSynchronization(onCompletion); + } } @Override @@ -119,17 +131,44 @@ public class ExtendedExchangeExtension implements ExchangeExtension { @Override public void handoverCompletions(Exchange target) { - this.exchange.handoverCompletions(target); + if (onCompletions != null) { + for (Synchronization onCompletion : onCompletions) { + target.getExchangeExtension().addOnCompletion(onCompletion); + } + // cleanup the temporary on completion list as they have been handed over + onCompletions.clear(); + onCompletions = null; + } else if (unitOfWork != null) { + // let unit of work handover + unitOfWork.handoverSynchronization(target); + } } @Override public List<Synchronization> handoverCompletions() { - return this.exchange.handoverCompletions(); + List<Synchronization> answer = null; + if (onCompletions != null) { + answer = new ArrayList<>(onCompletions); + onCompletions.clear(); + onCompletions = null; + } + return answer; } @Override public void setUnitOfWork(UnitOfWork unitOfWork) { - this.exchange.setUnitOfWork(unitOfWork); + this.unitOfWork = unitOfWork; + if (unitOfWork != null && onCompletions != null) { + // now an unit of work has been assigned so add the on completions + // we might have registered already + for (Synchronization onCompletion : onCompletions) { + unitOfWork.addSynchronization(onCompletion); + } + // cleanup the temporary on completion list as they now have been registered + // on the unit of work + onCompletions.clear(); + onCompletions = null; + } } @Override @@ -189,7 +228,13 @@ public class ExtendedExchangeExtension implements ExchangeExtension { @Override public boolean containsOnCompletion(Synchronization onCompletion) { - return this.exchange.containsOnCompletion(onCompletion); + if (unitOfWork != null) { + // if there is an unit of work then the completions is moved there + return unitOfWork.containsSynchronization(onCompletion); + } else { + // check temporary completions if no unit of work yet + return onCompletions != null && onCompletions.contains(onCompletion); + } } @Override @@ -253,7 +298,19 @@ public class ExtendedExchangeExtension implements ExchangeExtension { this.failureHandled = failureHandled; } + public UnitOfWork getUnitOfWork() { + return unitOfWork; + } + public void reset() { + if (this.unitOfWork != null) { + this.unitOfWork.reset(); + } + + if (this.onCompletions != null) { + this.onCompletions.clear(); + } + setHistoryNodeId(null); setHistoryNodeLabel(null); setTransacted(false);