This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-3.4.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.4.x by this push: new 3f95697 CAMEL-15195: Camel-netty - RequestTimeout seems not working as expected (#3925) (#3932) 3f95697 is described below commit 3f956977dc28e11799ad24294801fbd935107b03 Author: Amos Feng <zf...@redhat.com> AuthorDate: Thu Jun 18 14:34:16 2020 +0800 CAMEL-15195: Camel-netty - RequestTimeout seems not working as expected (#3925) (#3932) * add back the codes which removes the "timeout" handler after receiving the response * add the "timeout" handler when activating from the connection pool --- .../apache/camel/component/netty/NettyProducer.java | 21 ++++++++++++++++++--- .../netty/handlers/ClientChannelHandler.java | 7 +++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 51143e1..f915b82 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -109,14 +109,14 @@ public class NettyProducer extends DefaultAsyncProducer { config.timeBetweenEvictionRunsMillis = 30 * 1000L; config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle(); config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL; - pool = new GenericObjectPool<>(new NettyProducerPoolableObjectFactory(), config); + pool = new GenericObjectPool<>(new NettyProducerPoolableObjectFactory(this), config); if (LOG.isDebugEnabled()) { LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}", config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool); } } else { - pool = new SharedSingletonObjectPool<>(new NettyProducerPoolableObjectFactory()); + pool = new SharedSingletonObjectPool<>(new NettyProducerPoolableObjectFactory(this)); if (LOG.isDebugEnabled()) { LOG.debug("Created NettyProducer shared singleton pool -> {}", pool); } @@ -555,6 +555,11 @@ public class NettyProducer extends DefaultAsyncProducer { * Object factory to create {@link Channel} used by the pool. */ private final class NettyProducerPoolableObjectFactory implements PoolableObjectFactory<ChannelFuture> { + private NettyProducer producer; + + public NettyProducerPoolableObjectFactory(NettyProducer producer) { + this.producer = producer; + } @Override public ChannelFuture makeObject() throws Exception { @@ -603,8 +608,18 @@ public class NettyProducer extends DefaultAsyncProducer { @Override public void activateObject(ChannelFuture channelFuture) { - // noop LOG.trace("activateObject channel request: {}", channelFuture); + + if (channelFuture.isSuccess() && producer.getConfiguration().getRequestTimeout() > 0) { + LOG.trace("reset the request timeout as we activate the channel"); + Channel channel = channelFuture.channel(); + + ChannelHandler handler = channel.pipeline().get("timeout"); + if (handler == null) { + ChannelHandler timeout = new ReadTimeoutHandler(producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS); + channel.pipeline().addBefore("handler", "timeout", timeout); + } + } } @Override diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java index a2bbb21..1947e6e 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.netty.handlers; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.camel.AsyncCallback; @@ -148,6 +149,12 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { LOG.trace("Message received: {}", msg); } + ChannelHandler handler = ctx.pipeline().get("timeout"); + if (handler != null) { + LOG.trace("Removing timeout channel as we received message"); + ctx.pipeline().remove(handler); + } + NettyCamelState state = getState(ctx, msg); Exchange exchange = state != null ? state.getExchange() : null; if (exchange == null) {