This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.4.x by this push:
     new 3f95697  CAMEL-15195: Camel-netty - RequestTimeout seems not working 
as expected (#3925) (#3932)
3f95697 is described below

commit 3f956977dc28e11799ad24294801fbd935107b03
Author: Amos Feng <zf...@redhat.com>
AuthorDate: Thu Jun 18 14:34:16 2020 +0800

    CAMEL-15195: Camel-netty - RequestTimeout seems not working as expected 
(#3925) (#3932)
    
    * add back the codes which removes the "timeout" handler after receiving 
the response
    * add the "timeout" handler when activating from the connection pool
---
 .../apache/camel/component/netty/NettyProducer.java | 21 ++++++++++++++++++---
 .../netty/handlers/ClientChannelHandler.java        |  7 +++++++
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 51143e1..f915b82 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -109,14 +109,14 @@ public class NettyProducer extends DefaultAsyncProducer {
             config.timeBetweenEvictionRunsMillis = 30 * 1000L;
             config.minEvictableIdleTimeMillis = 
configuration.getProducerPoolMinEvictableIdle();
             config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
-            pool = new GenericObjectPool<>(new 
NettyProducerPoolableObjectFactory(), config);
+            pool = new GenericObjectPool<>(new 
NettyProducerPoolableObjectFactory(this), config);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Created NettyProducer pool[maxActive={}, 
minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
                         config.maxActive, config.minIdle, config.maxIdle, 
config.minEvictableIdleTimeMillis, pool);
             }
         } else {
-            pool = new SharedSingletonObjectPool<>(new 
NettyProducerPoolableObjectFactory());
+            pool = new SharedSingletonObjectPool<>(new 
NettyProducerPoolableObjectFactory(this));
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Created NettyProducer shared singleton pool -> {}", 
pool);
             }
@@ -555,6 +555,11 @@ public class NettyProducer extends DefaultAsyncProducer {
      * Object factory to create {@link Channel} used by the pool.
      */
     private final class NettyProducerPoolableObjectFactory implements 
PoolableObjectFactory<ChannelFuture> {
+        private NettyProducer producer;
+
+        public NettyProducerPoolableObjectFactory(NettyProducer producer) {
+            this.producer = producer;
+        }
 
         @Override
         public ChannelFuture makeObject() throws Exception {
@@ -603,8 +608,18 @@ public class NettyProducer extends DefaultAsyncProducer {
 
         @Override
         public void activateObject(ChannelFuture channelFuture) {
-            // noop
             LOG.trace("activateObject channel request: {}", channelFuture);
+
+            if (channelFuture.isSuccess() && 
producer.getConfiguration().getRequestTimeout() > 0) {
+                LOG.trace("reset the request timeout as we activate the 
channel");
+                Channel channel = channelFuture.channel();
+
+                ChannelHandler handler = channel.pipeline().get("timeout");
+                if (handler == null) {
+                    ChannelHandler timeout = new 
ReadTimeoutHandler(producer.getConfiguration().getRequestTimeout(), 
TimeUnit.MILLISECONDS);
+                    channel.pipeline().addBefore("handler", "timeout", 
timeout);
+                }
+            }
         }
 
         @Override
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
index a2bbb21..1947e6e 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.netty.handlers;
 
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.camel.AsyncCallback;
@@ -148,6 +149,12 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             LOG.trace("Message received: {}", msg);
         }
 
+        ChannelHandler handler = ctx.pipeline().get("timeout");
+        if (handler != null) {
+            LOG.trace("Removing timeout channel as we received message");
+            ctx.pipeline().remove(handler);
+        }
+
         NettyCamelState state = getState(ctx, msg);
         Exchange exchange = state != null ? state.getExchange() : null;
         if (exchange == null) {

Reply via email to