This is an automated email from the ASF dual-hosted git repository. klease pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 27bba838e2c CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) 27bba838e2c is described below commit 27bba838e2c32e0d00a71760b8fa5600c3005859 Author: klease <38634989+kle...@users.noreply.github.com> AuthorDate: Mon Jul 11 23:19:03 2022 +0200 CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) * CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. Call the afterProcess from the MDCcallback instead of scheduling it separately. Reset current routeId on MDC from startProcess. * CAMEL-18093 - Add option to turn on follow redirects Signed-off-by: Rhuan Rocha <rhuan...@gmail.com> * CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. Call the afterProcess from the MDCcallback instead of scheduling it separately. Reset current routeId on MDC from startProcess. Co-authored-by: Rhuan Rocha <rhuan...@gmail.com> --- .../camel/impl/engine/CamelInternalProcessor.java | 9 +------- .../camel/impl/engine/DefaultUnitOfWork.java | 26 ++++++++++++++++++++-- .../apache/camel/impl/engine/MDCUnitOfWork.java | 7 +++++- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 7babd956ea8..868269ac0ab 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -401,14 +401,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- - // optimize to only do after uow processing if really needed - if (beforeAndAfter) { - // use the same callback as with beforeProcess - final CamelInternalTask afterCallback = afterTask; - reactiveExecutor.schedule(() -> { - uow.afterProcess(processor, exchange, afterCallback, sync); - }); - } + // CAMEL-18255: move uow.afterProcess handling to the callback if (LOG.isTraceEnabled()) { LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 704d0d56f85..fa8c5c75f19 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -356,8 +356,8 @@ public class DefaultUnitOfWork implements UnitOfWork { @Override public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) { - // no wrapping needed - return callback; + // CAMEL-18255: support running afterProcess from the async callback + return isBeforeAfterProcess() ? new UnitOfWorkCallback(callback, processor) : callback; } @Override @@ -377,4 +377,26 @@ public class DefaultUnitOfWork implements UnitOfWork { public String toString() { return "DefaultUnitOfWork"; } + + private final class UnitOfWorkCallback implements AsyncCallback { + + private final AsyncCallback delegate; + private final Processor processor; + + private UnitOfWorkCallback(AsyncCallback delegate, Processor processor) { + this.delegate = delegate; + this.processor = processor; + } + + @Override + public void done(boolean doneSync) { + delegate.done(doneSync); + afterProcess(processor, exchange, delegate, doneSync); + } + + @Override + public String toString() { + return delegate.toString(); + } + } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java index 074521d0b1b..3cf1a52b289 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java @@ -85,6 +85,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { if (breadcrumbId != null) { MDC.put(MDC_BREADCRUMB_ID, breadcrumbId); } + Route current = getRoute(); + if (current != null) { + MDC.put(MDC_ROUTE_ID, current.getRouteId()); + } } @Override @@ -145,7 +149,8 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { MDC.put(MDC_STEP_ID, stepId); } // return callback with after processing work - return new MDCCallback(callback, pattern); + final AsyncCallback uowCallback = super.beforeProcess(processor, exchange, callback); + return new MDCCallback(uowCallback, pattern); } @Override