CAMEL-5819: Added requestTimeout option to netty producer. As well options to control logging level on netty consumer, so its less noisy by default about channel closed, when clients disconnect abruptly.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/048601dc Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/048601dc Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/048601dc Branch: refs/heads/camel-2.11.x Commit: 048601dc5aa28d432f3751f23be8dd53ecd1426a Parents: bcb0f00 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jun 4 16:46:00 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jun 4 16:47:30 2013 +0200 ---------------------------------------------------------------------- .../netty/DefaultClientPipelineFactory.java | 11 ++ .../camel/component/netty/NettyConfiguration.java | 27 +++++ .../camel/component/netty/NettyConsumer.java | 1 + .../netty/NettyConsumerExceptionHandler.java | 66 +++++++++++ .../netty/handlers/ServerChannelHandler.java | 8 +- .../component/netty/NettyRequestTimeoutTest.java | 84 +++++++++++++++ 6 files changed, 193 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java index cc7cc05..9503fac 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java @@ -17,6 +17,7 @@ package org.apache.camel.component.netty; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -27,6 +28,7 @@ import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +77,15 @@ public class DefaultClientPipelineFactory extends ClientPipelineFactory { addToPipeline("encoder-" + x, channelPipeline, encoder); } + // do we use request timeout? + if (producer.getConfiguration().getRequestTimeout() > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout()); + } + ChannelHandler timeout = new ReadTimeoutHandler(NettyComponent.getTimer(), producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS); + addToPipeline("timeout", channelPipeline, timeout); + } + // our handler must be added last addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer)); http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java index ddbc58c..af86443 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java @@ -47,6 +47,7 @@ public class NettyConfiguration implements Cloneable { private boolean tcpNoDelay = true; private boolean broadcast; private long connectTimeout = 10000; + private long requestTimeout; private boolean reuseAddress = true; private boolean sync = true; private boolean textline; @@ -74,6 +75,8 @@ public class NettyConfiguration implements Cloneable { private boolean transferExchange; private boolean disconnectOnNoReply = true; private LoggingLevel noReplyLogLevel = LoggingLevel.WARN; + private LoggingLevel serverExceptionCaughtLogLevel = LoggingLevel.WARN; + private LoggingLevel serverClosedChannelExceptionCaughtLogLevel = LoggingLevel.DEBUG; private boolean allowDefaultCodec = true; private ClientPipelineFactory clientPipelineFactory; private ServerPipelineFactory serverPipelineFactory; @@ -275,6 +278,14 @@ public class NettyConfiguration implements Cloneable { this.connectTimeout = connectTimeout; } + public long getRequestTimeout() { + return requestTimeout; + } + + public void setRequestTimeout(long requestTimeout) { + this.requestTimeout = requestTimeout; + } + public boolean isReuseAddress() { return reuseAddress; } @@ -515,6 +526,22 @@ public class NettyConfiguration implements Cloneable { this.noReplyLogLevel = noReplyLogLevel; } + public LoggingLevel getServerExceptionCaughtLogLevel() { + return serverExceptionCaughtLogLevel; + } + + public void setServerExceptionCaughtLogLevel(LoggingLevel serverExceptionCaughtLogLevel) { + this.serverExceptionCaughtLogLevel = serverExceptionCaughtLogLevel; + } + + public LoggingLevel getServerClosedChannelExceptionCaughtLogLevel() { + return serverClosedChannelExceptionCaughtLogLevel; + } + + public void setServerClosedChannelExceptionCaughtLogLevel(LoggingLevel serverClosedChannelExceptionCaughtLogLevel) { + this.serverClosedChannelExceptionCaughtLogLevel = serverClosedChannelExceptionCaughtLogLevel; + } + public boolean isAllowDefaultCodec() { return allowDefaultCodec; } http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java index 595e587..46dbb5b 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java @@ -56,6 +56,7 @@ public class NettyConsumer extends DefaultConsumer { this.context = this.getEndpoint().getCamelContext(); this.configuration = configuration; this.allChannels = new DefaultChannelGroup("NettyConsumer-" + nettyEndpoint.getEndpointUri()); + setExceptionHandler(new NettyConsumerExceptionHandler(this)); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java new file mode 100644 index 0000000..845b189 --- /dev/null +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.nio.channels.ClosedChannelException; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.util.CamelLogger; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NettyConsumerExceptionHandler implements ExceptionHandler { + + private static final transient Logger LOG = LoggerFactory.getLogger(NettyConsumer.class); + private final CamelLogger logger; + private final LoggingLevel closedLoggingLevel; + + public NettyConsumerExceptionHandler(NettyConsumer consumer) { + this.logger = new CamelLogger(LOG, consumer.getConfiguration().getServerExceptionCaughtLogLevel()); + this.closedLoggingLevel = consumer.getConfiguration().getServerClosedChannelExceptionCaughtLogLevel(); + } + + @Override + public void handleException(Throwable exception) { + handleException(null, null, exception); + } + + @Override + public void handleException(String message, Throwable exception) { + handleException(message, null, exception); + } + + @Override + public void handleException(String message, Exchange exchange, Throwable exception) { + try { + String msg = CamelExchangeException.createExceptionMessage(message, exchange, exception); + boolean closed = ObjectHelper.getException(ClosedChannelException.class, exception) != null; + if (closed) { + logger.log(msg, exception, closedLoggingLevel); + } else { + logger.log(msg, exception); + } + } catch (Throwable e) { + // the logging exception handler must not cause new exceptions to occur + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java index 8e0b4b0..317d377 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java @@ -41,8 +41,8 @@ import org.slf4j.LoggerFactory; public class ServerChannelHandler extends SimpleChannelUpstreamHandler { // use NettyConsumer as logger to make it easier to read the logs as this is part of the consumer private static final transient Logger LOG = LoggerFactory.getLogger(NettyConsumer.class); - private NettyConsumer consumer; - private CamelLogger noReplyLogger; + private final NettyConsumer consumer; + private final CamelLogger noReplyLogger; public ServerChannelHandler(NettyConsumer consumer) { this.consumer = consumer; @@ -71,8 +71,8 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler { public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception { // only close if we are still allowed to run if (consumer.isRunAllowed()) { - LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); - + // let the exception handler deal with it + consumer.getExceptionHandler().handleException("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); // close channel in case an exception was thrown NettyHelper.close(exceptionEvent.getChannel()); } http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java new file mode 100644 index 0000000..94f9e79 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.jboss.netty.handler.timeout.ReadTimeoutException; +import org.junit.Test; + +/** + * @version + */ +public class NettyRequestTimeoutTest extends BaseNettyTest { + + @Test + public void testRequestTimeoutOK() throws Exception { + String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=5000", "Hello Camel", String.class); + assertEquals("Bye World", out); + } + + @Test + public void testRequestTimeout() throws Exception { + try { + template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class); + fail("Should have thrown exception"); + } catch (CamelExecutionException e) { + ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause()); + assertNotNull(cause); + } + } + + @Test + public void testRequestTimeoutAndOk() throws Exception { + try { + template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class); + fail("Should have thrown exception"); + } catch (CamelExecutionException e) { + ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause()); + assertNotNull(cause); + } + + // now we try again but this time the is no delay on server and thus faster + String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello World", String.class); + assertEquals("Bye World", out); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty:tcp://localhost:{{port}}?textline=true&sync=true") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + + if (body.contains("Camel")) { + Thread.sleep(3000); + } + } + }) + .transform().constant("Bye World"); + + } + }; + } +}