This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit c7f45b87f73f9d74e9cf7d81dbbd33cf8dab6700 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Mar 15 09:38:59 2021 +0100 camel-core - Fixed pooled leak in redelivery error handler. --- .../main/java/org/apache/camel/processor/Pipeline.java | 17 +---------------- .../org/apache/camel/processor/PooledTaskFactory.java | 13 ++++++++++++- .../processor/errorhandler/RedeliveryErrorHandler.java | 8 ++++++-- .../camel/support/PooledObjectFactorySupport.java | 9 +++++++++ 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java index 5389003..557afa3 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java @@ -179,35 +179,20 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); if (pooled) { - taskFactory = new PooledTaskFactory() { + taskFactory = new PooledTaskFactory(getId()) { @Override public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { return new PipelineTask(); } - - @Override - public String toString() { - return "PooledTaskFactory[capacity: " + getCapacity() + "]"; - } }; int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); taskFactory.setCapacity(capacity); } else { taskFactory = new PrototypeTaskFactory() { @Override - public boolean isPooled() { - return false; - } - - @Override public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { return new PipelineTask(); } - - @Override - public String toString() { - return "PrototypeTaskFactory"; - } }; } LOG.trace("Using TaskFactory: {}", taskFactory); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java index 6cf306a..ac184ab 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java @@ -23,6 +23,13 @@ import org.apache.camel.support.PooledObjectFactorySupport; public abstract class PooledTaskFactory extends PooledObjectFactorySupport<PooledExchangeTask> implements PooledExchangeTaskFactory { + public PooledTaskFactory() { + } + + public PooledTaskFactory(Object source) { + super(source); + } + @Override public PooledExchangeTask acquire() { return pool.poll(); @@ -67,6 +74,10 @@ public abstract class PooledTaskFactory extends PooledObjectFactorySupport<Poole @Override public String toString() { - return "PooledTaskFactory[capacity: " + getCapacity() + "]"; + if (source != null) { + return "PooledTaskFactory[source: " + source + ", capacity: " + getCapacity() + "]"; + } else { + return "PooledTaskFactory[capacity: " + getCapacity() + "]"; + } } } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 1481748..26f03b8 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -46,6 +46,7 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer; import org.apache.camel.spi.ExchangeFormatter; +import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.ShutdownPrepared; import org.apache.camel.spi.ShutdownStrategy; @@ -779,7 +780,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport outputAsync.process(exchange, doneSync -> { // only continue with callback if we are done if (isDone(exchange)) { - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } else { // error occurred so loop back around and call ourselves reactiveExecutor.schedule(this); @@ -1594,7 +1597,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); if (pooled) { - taskFactory = new PooledTaskFactory() { + String id = output instanceof IdAware ? ((IdAware) output).getId() : output.toString(); + taskFactory = new PooledTaskFactory(id) { @Override public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { return simpleTask ? new SimpleTask() : new RedeliveryTask(); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java index 8568832..26d2f17 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java @@ -29,12 +29,21 @@ import org.apache.camel.support.service.ServiceSupport; */ public abstract class PooledObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> { + protected final Object source; protected UtilizationStatistics statistics; protected CamelContext camelContext; protected BlockingQueue<T> pool; protected int capacity = 100; protected boolean statisticsEnabled; + public PooledObjectFactorySupport() { + this.source = null; + } + + public PooledObjectFactorySupport(Object source) { + this.source = source; + } + @Override protected void doBuild() throws Exception { super.doBuild();