Repository: camel Updated Branches: refs/heads/camel-2.15.x 5347e630a -> 2df8ed4e3 refs/heads/camel-2.16.x 083b89bfa -> 7bb44ce60 refs/heads/master 3f749f781 -> 2c96cb137
CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2c96cb13 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2c96cb13 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2c96cb13 Branch: refs/heads/master Commit: 2c96cb1374cccb30525e5330190006a1e1fb79f0 Parents: 3f749f7 Author: Jonathan Anstey <jans...@gmail.com> Authored: Thu Nov 26 16:05:37 2015 -0330 Committer: Jonathan Anstey <jans...@gmail.com> Committed: Thu Nov 26 16:05:37 2015 -0330 ---------------------------------------------------------------------- .../netty4/handlers/ClientChannelHandler.java | 21 +++-- .../netty4/handlers/ServerChannelHandler.java | 8 +- .../component/netty4/NettyProducerHangTest.java | 93 ++++++++++++++++++++ 3 files changed, 113 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java index dd64cb6..8905540 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java @@ -24,6 +24,7 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.component.netty4.NettyCamelState; +import org.apache.camel.component.netty4.NettyConfiguration; import org.apache.camel.component.netty4.NettyConstants; import org.apache.camel.component.netty4.NettyHelper; import org.apache.camel.component.netty4.NettyPayloadHelper; @@ -47,12 +48,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { } @Override - public void channelActive(ChannelHandlerContext ctx) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("Channel open: {}", ctx.channel()); } // to keep track of open sockets producer.getAllChannels().add(ctx.channel()); + + super.channelActive(ctx); } @Override @@ -94,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { } @Override - public void channelInactive(ChannelHandlerContext ctx) { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("Channel closed: {}", ctx.channel()); } @@ -108,18 +111,23 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { // to keep track of open sockets producer.getAllChannels().remove(ctx.channel()); - if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) { + NettyConfiguration configuration = producer.getConfiguration(); + if (configuration.isSync() && !exceptionHandled) { // To avoid call the callback.done twice exceptionHandled = true; // session was closed but no message received. This could be because the remote server had an internal error - // and could not return a response. We should count down to stop waiting for a response + // and could not return a response. We should count down to stop waiting for a response + String address = configuration != null ? configuration.getAddress() : ""; if (LOG.isDebugEnabled()) { - LOG.debug("Channel closed but no message received from address: {}", producer.getConfiguration().getAddress()); + LOG.debug("Channel closed but no message received from address: {}", address); } - exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange)); + exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange)); // signal callback callback.done(false); } + + // make sure the event can be processed by other handlers + super.channelInactive(ctx); } @Override @@ -202,7 +210,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { * @throws Exception is thrown if error getting the response message */ protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception { - Object body = message; if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java index 0bb93fc..0df5ff0 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java @@ -48,21 +48,25 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { } @Override - public void channelActive(ChannelHandlerContext ctx) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("Channel open: {}", ctx.channel()); } // to keep track of open sockets consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel()); + + super.channelActive(ctx); } @Override - public void channelInactive(ChannelHandlerContext ctx) { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("Channel closed: {}", ctx.channel()); } // to keep track of open sockets consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel()); + + super.channelInactive(ctx); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java new file mode 100644 index 0000000..199180d --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; + +public class NettyProducerHangTest extends CamelTestSupport { + + private static int PORT = 4093; + + @Test + public void nettyProducerHangsOnTheSecondRequestToTheSocketWhichIsClosed() throws Exception { + new Thread(new Runnable() { + @Override + public void run() { + try { + acceptReplyAcceptClose(); + acceptReplyAcceptClose(); + } catch (IOException e) { + log.error("Exception occured: " + e.getMessage(), e); + } + } + }).start(); + + String response1 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request1", String.class); + log.info("Received first response <" + response1 + ">"); + + try { + // our test server will close the socket now so we should get an error + template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request2", String.class); + } catch (Exception e) { + assertStringContains(e.getCause().getMessage(), "No response received from remote server"); + } + + String response2 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request3", String.class); + log.info("Received 2nd response <" + response2 + ">"); + + try { + // our test server will close the socket now so we should get an error + template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request4", String.class); + } catch (Exception e) { + assertStringContains(e.getCause().getMessage(), "No response received from remote server"); + } + } + + private void acceptReplyAcceptClose() throws IOException { + byte buf[] = new byte[128]; + + ServerSocket serverSocket = new ServerSocket(PORT); + Socket soc = serverSocket.accept(); + + log.info("Open socket and accept data"); + try (InputStream is = soc.getInputStream(); + OutputStream os = soc.getOutputStream()) { + // read first message + is.read(buf); + + // reply to the first message + os.write("response\n".getBytes()); + + // read second message + is.read(buf); + + // do not reply, just close socket (emulate network problem) + } finally { + soc.close(); + serverSocket.close(); + } + log.info("Close socket"); + } + +}