This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.7.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push: new dc82083 Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5274) dc82083 is described below commit dc8208328f916a10021d298bbfc460631bab2e73 Author: Samrat Dhillon <samrat.dhil...@gmail.com> AuthorDate: Mon Mar 29 09:54:39 2021 -0400 Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5274) Co-authored-by: Samrat Dhillon <samrat.dhil...@innovapost.com> --- .../microprofile/faulttolerance/FaultToleranceProcessor.java | 10 ++++++++++ .../camel/component/resilience4j/ResilienceProcessor.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java index e8eb374..9d5754c 100644 --- a/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java +++ b/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -44,8 +44,10 @@ import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.CircuitBreakerConstants; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.Synchronization; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.util.ObjectHelper; import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException; @@ -317,6 +319,14 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); } + if (copy.getUnitOfWork() == null) { + // handover completions and done them manually to ensure they are being executed + List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG); + } else { + // done the unit of work + copy.getUnitOfWork().done(exchange); + } } catch (Throwable e) { exchange.setException(e); } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index c9bd5ca..e8cb471 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -47,8 +47,10 @@ import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.CircuitBreakerConstants; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.Synchronization; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -423,6 +425,15 @@ public class ResilienceProcessor extends AsyncProcessorSupport exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); } + if (copy.getUnitOfWork() == null) { + // handover completions and done them manually to ensure they are being executed + List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG); + } else { + // done the unit of work + copy.getUnitOfWork().done(exchange); + } + } catch (Throwable e) { exchange.setException(e); }