This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new dc82083  Explicilty calling doneSynchronizations on copy exchange 
objects as these objects have onCompletion added by NettyHttpProducer. 
Otherwise original exchange objects will not have these SynchronizationAdapters 
set and will result in memory leak from NettyHttpProducer (#5274)
dc82083 is described below

commit dc8208328f916a10021d298bbfc460631bab2e73
Author: Samrat Dhillon <samrat.dhil...@gmail.com>
AuthorDate: Mon Mar 29 09:54:39 2021 -0400

    Explicilty calling doneSynchronizations on copy exchange objects as these 
objects have onCompletion added by NettyHttpProducer. Otherwise original 
exchange objects will not have these SynchronizationAdapters set and will 
result in memory leak from NettyHttpProducer (#5274)
    
    Co-authored-by: Samrat Dhillon <samrat.dhil...@innovapost.com>
---
 .../microprofile/faulttolerance/FaultToleranceProcessor.java  | 10 ++++++++++
 .../camel/component/resilience4j/ResilienceProcessor.java     | 11 +++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
 
b/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index e8eb374..9d5754c 100644
--- 
a/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ 
b/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -44,8 +44,10 @@ import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.util.ObjectHelper;
 import 
org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
 import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
@@ -317,6 +319,14 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
                     
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, 
true);
                     
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
                 }
+                if (copy.getUnitOfWork() == null) {
+                    // handover completions and done them manually to ensure 
they are being executed
+                    List<Synchronization> synchronizations = 
copy.adapt(ExtendedExchange.class).handoverCompletions();
+                    UnitOfWorkHelper.doneSynchronizations(copy, 
synchronizations, LOG);
+                } else {
+                    // done the unit of work
+                    copy.getUnitOfWork().done(exchange);
+                }
             } catch (Throwable e) {
                 exchange.setException(e);
             }
diff --git 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index c9bd5ca..e8cb471 100644
--- 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -47,8 +47,10 @@ import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -423,6 +425,15 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport
                 
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, 
true);
                 
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
             }
+            if (copy.getUnitOfWork() == null) {
+                // handover completions and done them manually to ensure they 
are being executed
+                List<Synchronization> synchronizations = 
copy.adapt(ExtendedExchange.class).handoverCompletions();
+                UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, 
LOG);
+            } else {
+                // done the unit of work
+                copy.getUnitOfWork().done(exchange);
+            }
+
         } catch (Throwable e) {
             exchange.setException(e);
         }

Reply via email to