CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2df8ed4e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2df8ed4e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2df8ed4e Branch: refs/heads/camel-2.15.x Commit: 2df8ed4e3f0dba69dad59745c9ebc68600c1ddf4 Parents: 5347e63 Author: Jonathan Anstey <jans...@gmail.com> Authored: Thu Nov 26 16:05:37 2015 -0330 Committer: Jonathan Anstey <jans...@gmail.com> Committed: Thu Nov 26 16:06:26 2015 -0330 ---------------------------------------------------------------------- .../netty4/handlers/ClientChannelHandler.java | 21 +++-- .../netty4/handlers/ServerChannelHandler.java | 8 +- .../component/netty4/NettyProducerHangTest.java | 93 ++++++++++++++++++++ 3 files changed, 113 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java index e7d0d13..81c67cc 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java @@ -24,6 +24,7 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.component.netty4.NettyCamelState; +import org.apache.camel.component.netty4.NettyConfiguration; import org.apache.camel.component.netty4.NettyConstants; import org.apache.camel.component.netty4.NettyHelper; import org.apache.camel.component.netty4.NettyPayloadHelper; @@ -47,12 +48,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { } @Override - public void channelActive(ChannelHandlerContext ctx) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("Channel open: {}", ctx.channel()); } // to keep track of open sockets producer.getAllChannels().add(ctx.channel()); + + super.channelActive(ctx); } @Override @@ -89,7 +92,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { } @Override - public void channelInactive(ChannelHandlerContext ctx) { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("Channel closed: {}", ctx.channel()); } @@ -103,18 +106,23 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { // to keep track of open sockets producer.getAllChannels().remove(ctx.channel()); - if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) { + NettyConfiguration configuration = producer.getConfiguration(); + if (configuration.isSync() && !exceptionHandled) { // To avoid call the callback.done twice exceptionHandled = true; // session was closed but no message received. This could be because the remote server had an internal error - // and could not return a response. We should count down to stop waiting for a response + // and could not return a response. We should count down to stop waiting for a response + String address = configuration != null ? configuration.getAddress() : ""; if (LOG.isDebugEnabled()) { - LOG.debug("Channel closed but no message received from address: {}", producer.getConfiguration().getAddress()); + LOG.debug("Channel closed but no message received from address: {}", address); } - exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange)); + exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange)); // signal callback callback.done(false); } + + // make sure the event can be processed by other handlers + super.channelInactive(ctx); } @Override @@ -198,7 +206,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { * @throws Exception is thrown if error getting the response message */ protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception { - Object body = message; if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java index 0bb93fc..0df5ff0 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java @@ -48,21 +48,25 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { } @Override - public void channelActive(ChannelHandlerContext ctx) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("Channel open: {}", ctx.channel()); } // to keep track of open sockets consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel()); + + super.channelActive(ctx); } @Override - public void channelInactive(ChannelHandlerContext ctx) { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("Channel closed: {}", ctx.channel()); } // to keep track of open sockets consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel()); + + super.channelInactive(ctx); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java new file mode 100644 index 0000000..199180d --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; + +public class NettyProducerHangTest extends CamelTestSupport { + + private static int PORT = 4093; + + @Test + public void nettyProducerHangsOnTheSecondRequestToTheSocketWhichIsClosed() throws Exception { + new Thread(new Runnable() { + @Override + public void run() { + try { + acceptReplyAcceptClose(); + acceptReplyAcceptClose(); + } catch (IOException e) { + log.error("Exception occured: " + e.getMessage(), e); + } + } + }).start(); + + String response1 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request1", String.class); + log.info("Received first response <" + response1 + ">"); + + try { + // our test server will close the socket now so we should get an error + template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request2", String.class); + } catch (Exception e) { + assertStringContains(e.getCause().getMessage(), "No response received from remote server"); + } + + String response2 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request3", String.class); + log.info("Received 2nd response <" + response2 + ">"); + + try { + // our test server will close the socket now so we should get an error + template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request4", String.class); + } catch (Exception e) { + assertStringContains(e.getCause().getMessage(), "No response received from remote server"); + } + } + + private void acceptReplyAcceptClose() throws IOException { + byte buf[] = new byte[128]; + + ServerSocket serverSocket = new ServerSocket(PORT); + Socket soc = serverSocket.accept(); + + log.info("Open socket and accept data"); + try (InputStream is = soc.getInputStream(); + OutputStream os = soc.getOutputStream()) { + // read first message + is.read(buf); + + // reply to the first message + os.write("response\n".getBytes()); + + // read second message + is.read(buf); + + // do not reply, just close socket (emulate network problem) + } finally { + soc.close(); + serverSocket.close(); + } + log.info("Close socket"); + } + +}