CAMEL-9003 - Use header for overriding request timeouts instead
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5187b5de Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5187b5de Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5187b5de Branch: refs/heads/master Commit: 5187b5dea17062ab05afd2e9e40615ef6609d27f Parents: 9ce76f9 Author: Jonathan Anstey <jans...@gmail.com> Authored: Thu Jul 23 15:23:20 2015 -0230 Committer: Jonathan Anstey <jans...@gmail.com> Committed: Thu Jul 23 15:23:48 2015 -0230 ---------------------------------------------------------------------- .../camel/component/netty/NettyConstants.java | 1 + .../camel/component/netty/NettyEndpoint.java | 14 +----- .../camel/component/netty/NettyProducer.java | 14 ++++++ .../netty/handlers/ClientChannelHandler.java | 15 +++--- .../netty/NettyCachedRequestTimeoutTest.java | 53 -------------------- .../netty/NettyRequestTimeoutTest.java | 11 ++++ .../camel/component/netty4/NettyConstants.java | 1 + .../camel/component/netty4/NettyEndpoint.java | 12 +---- .../camel/component/netty4/NettyProducer.java | 13 +++++ .../netty4/handlers/ClientChannelHandler.java | 10 ++-- .../netty4/NettyCachedRequestTimeoutTest.java | 53 -------------------- .../netty4/NettyRequestTimeoutTest.java | 11 ++++ 12 files changed, 65 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java index 4082c7d..eed8266 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java @@ -28,6 +28,7 @@ public final class NettyConstants { public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent"; public static final String NETTY_REMOTE_ADDRESS = "CamelNettyRemoteAddress"; public static final String NETTY_LOCAL_ADDRESS = "CamelNettyLocalAddress"; + public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout"; public static final String NETTY_SSL_SESSION = "CamelNettySSLSession"; public static final String NETTY_SSL_CLIENT_CERT_SUBJECT_NAME = "CamelNettySSLClientCertSubjectName"; public static final String NETTY_SSL_CLIENT_CERT_ISSUER_NAME = "CamelNettySSLClientCertIssuerName"; http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java index 81b8648..e8fcbcc 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java @@ -98,20 +98,10 @@ public class NettyEndpoint extends DefaultEndpoint { @Override protected String createEndpointUri() { ObjectHelper.notNull(configuration, "configuration"); - return "netty:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort() - + ((getConfiguration().getRequestTimeout() > 0) ? "?requestTimeout=" + getConfiguration().getRequestTimeout() : ""); + return "netty:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort(); } @Override - public String getEndpointUri() { - if (getConfiguration().getRequestTimeout() > 0) { - return super.getEndpointUri() + "?requestTimeout=" + getConfiguration().getRequestTimeout(); - } else { - return super.getEndpointUri(); - } - } - - @Override protected void doStart() throws Exception { ObjectHelper.notNull(timer, "timer"); } @@ -177,4 +167,4 @@ public class NettyEndpoint extends DefaultEndpoint { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index bf72284..1bfe547 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -39,9 +39,11 @@ import org.apache.commons.pool.impl.GenericObjectPool; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelConfig; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.SucceededChannelFuture; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; @@ -52,6 +54,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool; import org.jboss.netty.channel.socket.nio.WorkerPool; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; import org.jboss.netty.util.ExternalResourceReleasable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,6 +249,17 @@ public class NettyProducer extends DefaultAsyncProducer { return true; } + if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) { + long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class); + ChannelHandler oldHandler = existing.getPipeline().get("timeout"); + ReadTimeoutHandler newHandler = new ReadTimeoutHandler(getEndpoint().getTimer(), timeoutInMs, TimeUnit.MILLISECONDS); + if (oldHandler == null) { + existing.getPipeline().addBefore("handler", "timeout", newHandler); + } else { + existing.getPipeline().replace(oldHandler, "timeout", newHandler); + } + } + // need to declare as final final Channel channel = existing; final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback); http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java index c314e39..8988ad5 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java @@ -138,19 +138,18 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler { LOG.trace("Message received: {}", messageEvent); } - if (producer.getConfiguration().getRequestTimeout() > 0) { - ChannelHandler handler = ctx.getPipeline().get("timeout"); - if (handler != null) { - LOG.trace("Removing timeout channel as we received message"); - ctx.getPipeline().remove(handler); - } + ChannelHandler handler = ctx.getPipeline().get("timeout"); + if (handler != null) { + LOG.trace("Removing timeout channel as we received message"); + ctx.getPipeline().remove(handler); } - + Exchange exchange = getExchange(ctx); if (exchange == null) { // we just ignore the received message as the channel is closed return; - } + } + AsyncCallback callback = getAsyncCallback(ctx); Message message; http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java deleted file mode 100644 index fcb7e37..0000000 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.netty; - -import org.apache.camel.builder.RouteBuilder; -import org.junit.Test; - -/** - * @version - */ -public class NettyCachedRequestTimeoutTest extends BaseNettyTest { - - @Test - public void testRequestTimeoutKeyInProducerCache() throws Exception { - assertEquals(0, template.getCurrentCacheSize()); - String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class); - assertEquals("Bye World", out); - out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class); - assertEquals("Bye World", out); - assertEquals(1, template.getCurrentCacheSize()); - - template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1001", "Hello Camel", String.class); - assertEquals(2, template.getCurrentCacheSize()); - template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1002", "Hello Camel", String.class); - assertEquals(3, template.getCurrentCacheSize()); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("netty:tcp://localhost:{{port}}?textline=true&sync=true") - .transform().constant("Bye World"); - - } - }; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java index 94f9e79..8b499ce 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java @@ -46,6 +46,17 @@ public class NettyRequestTimeoutTest extends BaseNettyTest { } @Test + public void testRequestTimeoutViaHeader() throws Exception { + try { + template.requestBodyAndHeader("netty:tcp://localhost:{{port}}?textline=true&sync=true", "Hello Camel", NettyConstants.NETTY_REQUEST_TIMEOUT, 1000, String.class); + fail("Should have thrown exception"); + } catch (CamelExecutionException e) { + ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause()); + assertNotNull(cause); + } + } + + @Test public void testRequestTimeoutAndOk() throws Exception { try { template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class); http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java index e381e61..5466c2a 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java @@ -34,6 +34,7 @@ public final class NettyConstants { public static final String NETTY_SSL_CLIENT_CERT_SERIAL_NO = "CamelNettySSLClientCertSerialNumber"; public static final String NETTY_SSL_CLIENT_CERT_NOT_BEFORE = "CamelNettySSLClientCertNotBefore"; public static final String NETTY_SSL_CLIENT_CERT_NOT_AFTER = "CamelNettySSLClientCertNotAfter"; + public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout"; private NettyConstants() { // Utility class http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java index 5669e9d..1b57b11 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java @@ -89,18 +89,8 @@ public class NettyEndpoint extends DefaultEndpoint { @Override protected String createEndpointUri() { ObjectHelper.notNull(configuration, "configuration"); - return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort() - + ((getConfiguration().getRequestTimeout() > 0) ? "?requestTimeout=" + getConfiguration().getRequestTimeout() : ""); + return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort(); } - - @Override - public String getEndpointUri() { - if (getConfiguration().getRequestTimeout() > 0) { - return super.getEndpointUri() + "?requestTimeout=" + getConfiguration().getRequestTimeout(); - } else { - return super.getEndpointUri(); - } - } protected SSLSession getSSLSession(ChannelHandlerContext ctx) { final SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java index 965397f..14dab4b 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java @@ -28,6 +28,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; @@ -35,6 +36,7 @@ import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.camel.AsyncCallback; @@ -216,6 +218,17 @@ public class NettyProducer extends DefaultAsyncProducer { return true; } + if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) { + long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class); + ChannelHandler oldHandler = existing.pipeline().get("timeout"); + ReadTimeoutHandler newHandler = new ReadTimeoutHandler(timeoutInMs, TimeUnit.MILLISECONDS); + if (oldHandler == null) { + existing.pipeline().addBefore("handler", "timeout", newHandler); + } else { + existing.pipeline().replace(oldHandler, "timeout", newHandler); + } + } + // need to declare as final final Channel channel = existing; final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback); http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java index 28d6e22..dd64cb6 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java @@ -130,12 +130,10 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { LOG.trace("Message received: {}", msg); } - if (producer.getConfiguration().getRequestTimeout() > 0) { - ChannelHandler handler = ctx.pipeline().get("timeout"); - if (handler != null) { - LOG.trace("Removing timeout channel as we received message"); - ctx.pipeline().remove(handler); - } + ChannelHandler handler = ctx.pipeline().get("timeout"); + if (handler != null) { + LOG.trace("Removing timeout channel as we received message"); + ctx.pipeline().remove(handler); } Exchange exchange = getExchange(ctx); http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java deleted file mode 100644 index 93765cd..0000000 --- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.netty4; - -import org.apache.camel.builder.RouteBuilder; -import org.junit.Test; - -/** - * @version - */ -public class NettyCachedRequestTimeoutTest extends BaseNettyTest { - - @Test - public void testRequestTimeoutKeyInProducerCache() throws Exception { - assertEquals(0, template.getCurrentCacheSize()); - String out = template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class); - assertEquals("Bye World", out); - out = template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class); - assertEquals("Bye World", out); - assertEquals(1, template.getCurrentCacheSize()); - - template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1001", "Hello Camel", String.class); - assertEquals(2, template.getCurrentCacheSize()); - template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1002", "Hello Camel", String.class); - assertEquals(3, template.getCurrentCacheSize()); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("netty4:tcp://localhost:{{port}}?textline=true&sync=true") - .transform().constant("Bye World"); - - } - }; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java index fe98395..770fab5 100644 --- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java @@ -46,6 +46,17 @@ public class NettyRequestTimeoutTest extends BaseNettyTest { } @Test + public void testRequestTimeoutViaHeader() throws Exception { + try { + template.requestBodyAndHeader("netty4:tcp://localhost:{{port}}?textline=true&sync=true", "Hello Camel", NettyConstants.NETTY_REQUEST_TIMEOUT, 1000, String.class); + fail("Should have thrown exception"); + } catch (CamelExecutionException e) { + ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause()); + assertNotNull(cause); + } + } + + @Test public void testRequestTimeoutAndOk() throws Exception { try { template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);