CAMEL-7909 camel-netty-http consumer need to close the connection if the response connection header is close
Conflicts: components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9cf11bd5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9cf11bd5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9cf11bd5 Branch: refs/heads/camel-2.13.x Commit: 9cf11bd58146abb4938d0b546d3308d6e086b6da Parents: f0e3e20 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue Oct 14 10:14:12 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Oct 17 11:35:34 2014 +0800 ---------------------------------------------------------------------- .../netty/http/DefaultNettyHttpBinding.java | 9 +++++++-- .../http/handlers/HttpServerChannelHandler.java | 19 ++----------------- .../camel/component/netty/NettyHelper.java | 2 +- .../camel/component/netty/NettyProducer.java | 13 ++++++++++--- .../handlers/ServerResponseFutureListener.java | 5 +++++ 5 files changed, 25 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index 23cae6c..fa8fa81 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -34,6 +34,7 @@ import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConverter; +import org.apache.camel.component.netty.NettyConstants; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; @@ -67,7 +68,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { public DefaultNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) { this.headerFilterStrategy = headerFilterStrategy; } - + public DefaultNettyHttpBinding copy() { try { return (DefaultNettyHttpBinding)this.clone(); @@ -384,7 +385,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { if (buffer.readerIndex() == buffer.writerIndex()) { buffer.setIndex(0, buffer.writerIndex()); } - // TODO How to enable the chunk transport + // TODO How to enable the chunk transport int len = buffer.readableBytes(); // set content-length response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, len); @@ -411,6 +412,10 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } } response.setHeader(HttpHeaders.Names.CONNECTION, connection); + // Just make sure we close the channel when the connection value is close + if (connection.equalsIgnoreCase(HttpHeaders.Values.CLOSE)) { + message.setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true); + } LOG.trace("Connection: {}", connection); return response; http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java index 8894f6a..63edef7 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.netty.http.handlers; -import java.net.SocketAddress; import java.net.URI; import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; @@ -26,7 +25,6 @@ import javax.security.auth.login.LoginException; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; -import org.apache.camel.component.netty.NettyConsumer; import org.apache.camel.component.netty.NettyHelper; import org.apache.camel.component.netty.handlers.ServerChannelHandler; import org.apache.camel.component.netty.http.HttpPrincipal; @@ -37,7 +35,6 @@ import org.apache.camel.util.CamelLogger; import org.apache.camel.util.ObjectHelper; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; @@ -49,8 +46,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.jboss.netty.handler.codec.http.HttpHeaders.is100ContinueExpected; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.CONTINUE; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; @@ -58,6 +53,7 @@ import static org.jboss.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAV import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + /** * Netty HTTP {@link ServerChannelHandler} that handles the incoming HTTP requests and routes * the received message in Camel. @@ -268,7 +264,7 @@ public class HttpServerChannelHandler extends ServerChannelHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception { - + // only close if we are still allowed to run if (consumer.isRunAllowed()) { @@ -282,17 +278,6 @@ public class HttpServerChannelHandler extends ServerChannelHandler { } } - @Override - protected ChannelFutureListener createResponseFutureListener(NettyConsumer consumer, Exchange exchange, SocketAddress remoteAddress) { - // make sure to close channel if not keep-alive - if (request != null && isKeepAlive(request)) { - LOG.trace("Request has Connection: keep-alive so Channel is not being closed"); - return null; - } else { - LOG.trace("Request is not Connection: close so Channel is being closed"); - return ChannelFutureListener.CLOSE; - } - } @Override protected Object getResponseBody(Exchange exchange) throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java index 05f3e4d..b9368fa 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java @@ -111,7 +111,7 @@ public final class NettyHelper { public static void close(Channel channel) { if (channel != null) { LOG.trace("Closing channel: {}", channel); - channel.close(); + channel.close().syncUninterruptibly(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 50de736..87ad2be 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -498,8 +498,11 @@ public class NettyProducer extends DefaultAsyncProducer { public void done(boolean doneSync) { // put back in pool try { - LOG.trace("Putting channel back to pool {}", channel); - pool.returnObject(channel); + // Only put the connected channel back to the pool + if (channel.isConnected()) { + LOG.trace("Putting channel back to pool {}", channel); + pool.returnObject(channel); + } } catch (Exception e) { LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel); } finally { @@ -525,7 +528,9 @@ public class NettyProducer extends DefaultAsyncProducer { @Override public void destroyObject(Channel channel) throws Exception { LOG.trace("Destroying channel: {}", channel); - NettyHelper.close(channel); + if (channel.isOpen()) { + NettyHelper.close(channel); + } allChannels.remove(channel); } @@ -540,11 +545,13 @@ public class NettyProducer extends DefaultAsyncProducer { @Override public void activateObject(Channel channel) throws Exception { // noop + LOG.trace("activateObject channel: {} -> {}", channel); } @Override public void passivateObject(Channel channel) throws Exception { // noop + LOG.trace("passivateObject channel: {} -> {}", channel); } } http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java index 619a62e..90dc9e6 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java @@ -61,6 +61,11 @@ public class ServerResponseFutureListener implements ChannelFutureListener { } else { close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); } + + // check the setting on the exchange property + if (close == null) { + close = exchange.getProperty(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } // should we disconnect, the header can override the configuration boolean disconnect = consumer.getConfiguration().isDisconnect();