This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.11.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5c61c1a05a8131f137fbdbf535309b25d6a0daf4
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Jun 25 11:24:58 2021 +0200

    CAMEL-16704: camel-ahc - The producer should use its own worker pool to 
avoid blocking AHC/Netty threads when continue to routing after reponse is 
received.
---
 .../apache/camel/component/ahc/AhcProducer.java    | 28 +++++++++++++++++++---
 1 file changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
 
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
index 15da32d..6bfe833 100644
--- 
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
+++ 
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.ahc;
 
 import java.io.ByteArrayOutputStream;
+import java.util.concurrent.ExecutorService;
 
 import io.netty.handler.codec.http.HttpHeaders;
 import org.apache.camel.AsyncCallback;
@@ -37,6 +38,8 @@ public class AhcProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AhcProducer.class);
     private final AsyncHttpClient client;
+    private ExecutorService workerPool;
+    private boolean shutdownPool;
 
     public AhcProducer(AhcEndpoint endpoint) {
         super(endpoint);
@@ -49,6 +52,24 @@ public class AhcProducer extends DefaultAsyncProducer {
     }
 
     @Override
+    protected void doStart() throws Exception {
+        if (workerPool == null) {
+            workerPool = 
getEndpoint().getCamelContext().getExecutorServiceManager().newCachedThreadPool(this,
 "AhcWorkerPool");
+            shutdownPool = true;
+        }
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (shutdownPool && workerPool != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(workerPool);
+            workerPool = null;
+        }
+    }
+
+    @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
             // AHC supports async processing
@@ -94,7 +115,8 @@ public class AhcProducer extends DefaultAsyncProducer {
             } catch (Exception e) {
                 exchange.setException(e);
             } finally {
-                callback.done(false);
+                // use worker pool to continue routing to avoid blocking 
ahc/netty threads
+                workerPool.execute(callback);
             }
         }
 
@@ -108,8 +130,8 @@ public class AhcProducer extends DefaultAsyncProducer {
             } catch (Exception e) {
                 exchange.setException(e);
             } finally {
-                // signal we are done
-                callback.done(false);
+                // use worker pool to continue routing to avoid blocking 
ahc/netty threads
+                workerPool.execute(callback);
             }
             return exchange;
         }

Reply via email to