This is an automated email from the ASF dual-hosted git repository.

klease pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 27bba838e2c CAMEL-18255: Address memory leak caused by scheduling 
MDC.afterProcess. (#7987)
27bba838e2c is described below

commit 27bba838e2c32e0d00a71760b8fa5600c3005859
Author: klease <38634989+kle...@users.noreply.github.com>
AuthorDate: Mon Jul 11 23:19:03 2022 +0200

    CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. 
(#7987)
    
    * CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess.
    Call the afterProcess from the MDCcallback instead of scheduling it
    separately. Reset current routeId on MDC from startProcess.
    
    * CAMEL-18093 - Add option to turn on follow redirects
    
    Signed-off-by: Rhuan Rocha <rhuan...@gmail.com>
    
    * CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess.
    Call the afterProcess from the MDCcallback instead of scheduling it
    separately. Reset current routeId on MDC from startProcess.
    
    Co-authored-by: Rhuan Rocha <rhuan...@gmail.com>
---
 .../camel/impl/engine/CamelInternalProcessor.java  |  9 +-------
 .../camel/impl/engine/DefaultUnitOfWork.java       | 26 ++++++++++++++++++++--
 .../apache/camel/impl/engine/MDCUnitOfWork.java    |  7 +++++-
 3 files changed, 31 insertions(+), 11 deletions(-)

diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 7babd956ea8..868269ac0ab 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -401,14 +401,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            // optimize to only do after uow processing if really needed
-            if (beforeAndAfter) {
-                // use the same callback as with beforeProcess
-                final CamelInternalTask afterCallback = afterTask;
-                reactiveExecutor.schedule(() -> {
-                    uow.afterProcess(processor, exchange, afterCallback, sync);
-                });
-            }
+            // CAMEL-18255: move uow.afterProcess handling to the callback
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Exchange processed and is continued routed {} for 
exchangeId: {} -> {}",
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 704d0d56f85..fa8c5c75f19 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -356,8 +356,8 @@ public class DefaultUnitOfWork implements UnitOfWork {
 
     @Override
     public AsyncCallback beforeProcess(Processor processor, Exchange exchange, 
AsyncCallback callback) {
-        // no wrapping needed
-        return callback;
+        // CAMEL-18255: support running afterProcess from the async callback
+        return isBeforeAfterProcess() ? new UnitOfWorkCallback(callback, 
processor) : callback;
     }
 
     @Override
@@ -377,4 +377,26 @@ public class DefaultUnitOfWork implements UnitOfWork {
     public String toString() {
         return "DefaultUnitOfWork";
     }
+
+    private final class UnitOfWorkCallback implements AsyncCallback {
+
+        private final AsyncCallback delegate;
+        private final Processor processor;
+
+        private UnitOfWorkCallback(AsyncCallback delegate, Processor 
processor) {
+            this.delegate = delegate;
+            this.processor = processor;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            delegate.done(doneSync);
+            afterProcess(processor, exchange, delegate, doneSync);
+        }
+
+        @Override
+        public String toString() {
+            return delegate.toString();
+        }
+    }
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index 074521d0b1b..3cf1a52b289 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -85,6 +85,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork 
implements Service {
         if (breadcrumbId != null) {
             MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
         }
+        Route current = getRoute();
+        if (current != null) {
+            MDC.put(MDC_ROUTE_ID, current.getRouteId());
+        }
     }
 
     @Override
@@ -145,7 +149,8 @@ public class MDCUnitOfWork extends DefaultUnitOfWork 
implements Service {
             MDC.put(MDC_STEP_ID, stepId);
         }
         // return callback with after processing work
-        return new MDCCallback(callback, pattern);
+        final AsyncCallback uowCallback = super.beforeProcess(processor, 
exchange, callback);
+        return new MDCCallback(uowCallback, pattern);
     }
 
     @Override

Reply via email to