This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.11.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5c61c1a05a8131f137fbdbf535309b25d6a0daf4 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jun 25 11:24:58 2021 +0200 CAMEL-16704: camel-ahc - The producer should use its own worker pool to avoid blocking AHC/Netty threads when continue to routing after reponse is received. --- .../apache/camel/component/ahc/AhcProducer.java | 28 +++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java index 15da32d..6bfe833 100644 --- a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java +++ b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.ahc; import java.io.ByteArrayOutputStream; +import java.util.concurrent.ExecutorService; import io.netty.handler.codec.http.HttpHeaders; import org.apache.camel.AsyncCallback; @@ -37,6 +38,8 @@ public class AhcProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(AhcProducer.class); private final AsyncHttpClient client; + private ExecutorService workerPool; + private boolean shutdownPool; public AhcProducer(AhcEndpoint endpoint) { super(endpoint); @@ -49,6 +52,24 @@ public class AhcProducer extends DefaultAsyncProducer { } @Override + protected void doStart() throws Exception { + if (workerPool == null) { + workerPool = getEndpoint().getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AhcWorkerPool"); + shutdownPool = true; + } + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + if (shutdownPool && workerPool != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(workerPool); + workerPool = null; + } + } + + @Override public boolean process(Exchange exchange, AsyncCallback callback) { try { // AHC supports async processing @@ -94,7 +115,8 @@ public class AhcProducer extends DefaultAsyncProducer { } catch (Exception e) { exchange.setException(e); } finally { - callback.done(false); + // use worker pool to continue routing to avoid blocking ahc/netty threads + workerPool.execute(callback); } } @@ -108,8 +130,8 @@ public class AhcProducer extends DefaultAsyncProducer { } catch (Exception e) { exchange.setException(e); } finally { - // signal we are done - callback.done(false); + // use worker pool to continue routing to avoid blocking ahc/netty threads + workerPool.execute(callback); } return exchange; }