This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c7f45b87f73f9d74e9cf7d81dbbd33cf8dab6700
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Mar 15 09:38:59 2021 +0100

    camel-core - Fixed pooled leak in redelivery error handler.
---
 .../main/java/org/apache/camel/processor/Pipeline.java  | 17 +----------------
 .../org/apache/camel/processor/PooledTaskFactory.java   | 13 ++++++++++++-
 .../processor/errorhandler/RedeliveryErrorHandler.java  |  8 ++++++--
 .../camel/support/PooledObjectFactorySupport.java       |  9 +++++++++
 4 files changed, 28 insertions(+), 19 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
index 5389003..557afa3 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -179,35 +179,20 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
 
         boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
         if (pooled) {
-            taskFactory = new PooledTaskFactory() {
+            taskFactory = new PooledTaskFactory(getId()) {
                 @Override
                 public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
                     return new PipelineTask();
                 }
-
-                @Override
-                public String toString() {
-                    return "PooledTaskFactory[capacity: " + getCapacity() + 
"]";
-                }
             };
             int capacity = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
             taskFactory.setCapacity(capacity);
         } else {
             taskFactory = new PrototypeTaskFactory() {
                 @Override
-                public boolean isPooled() {
-                    return false;
-                }
-
-                @Override
                 public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
                     return new PipelineTask();
                 }
-
-                @Override
-                public String toString() {
-                    return "PrototypeTaskFactory";
-                }
             };
         }
         LOG.trace("Using TaskFactory: {}", taskFactory);
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
index 6cf306a..ac184ab 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
@@ -23,6 +23,13 @@ import org.apache.camel.support.PooledObjectFactorySupport;
 public abstract class PooledTaskFactory extends 
PooledObjectFactorySupport<PooledExchangeTask>
         implements PooledExchangeTaskFactory {
 
+    public PooledTaskFactory() {
+    }
+
+    public PooledTaskFactory(Object source) {
+        super(source);
+    }
+
     @Override
     public PooledExchangeTask acquire() {
         return pool.poll();
@@ -67,6 +74,10 @@ public abstract class PooledTaskFactory extends 
PooledObjectFactorySupport<Poole
 
     @Override
     public String toString() {
-        return "PooledTaskFactory[capacity: " + getCapacity() + "]";
+        if (source != null) {
+            return "PooledTaskFactory[source: " + source + ", capacity: " + 
getCapacity() + "]";
+        } else {
+            return "PooledTaskFactory[capacity: " + getCapacity() + "]";
+        }
     }
 }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 1481748..26f03b8 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -46,6 +46,7 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
 import org.apache.camel.spi.ExchangeFormatter;
+import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.ShutdownPrepared;
 import org.apache.camel.spi.ShutdownStrategy;
@@ -779,7 +780,9 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                 outputAsync.process(exchange, doneSync -> {
                     // only continue with callback if we are done
                     if (isDone(exchange)) {
-                        reactiveExecutor.schedule(callback);
+                        AsyncCallback cb = callback;
+                        taskFactory.release(this);
+                        reactiveExecutor.schedule(cb);
                     } else {
                         // error occurred so loop back around and call 
ourselves
                         reactiveExecutor.schedule(this);
@@ -1594,7 +1597,8 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
 
         boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
         if (pooled) {
-            taskFactory = new PooledTaskFactory() {
+            String id = output instanceof IdAware ? ((IdAware) output).getId() 
: output.toString();
+            taskFactory = new PooledTaskFactory(id) {
                 @Override
                 public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
                     return simpleTask ? new SimpleTask() : new 
RedeliveryTask();
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
index 8568832..26d2f17 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
@@ -29,12 +29,21 @@ import org.apache.camel.support.service.ServiceSupport;
  */
 public abstract class PooledObjectFactorySupport<T> extends ServiceSupport 
implements PooledObjectFactory<T> {
 
+    protected final Object source;
     protected UtilizationStatistics statistics;
     protected CamelContext camelContext;
     protected BlockingQueue<T> pool;
     protected int capacity = 100;
     protected boolean statisticsEnabled;
 
+    public PooledObjectFactorySupport() {
+        this.source = null;
+    }
+
+    public PooledObjectFactorySupport(Object source) {
+        this.source = source;
+    }
+
     @Override
     protected void doBuild() throws Exception {
         super.doBuild();

Reply via email to