CAMEL-7909 Merged the patch into camel-netty4-http module
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/64389b1e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/64389b1e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/64389b1e Branch: refs/heads/camel-2.14.x Commit: 64389b1eaae59c9c1365b83452bf09f42a900744 Parents: a733210 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue Oct 14 10:39:33 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Oct 17 11:25:57 2014 +0800 ---------------------------------------------------------------------- .../component/netty4/http/DefaultNettyHttpBinding.java | 5 +++++ .../netty4/http/handlers/HttpServerChannelHandler.java | 12 ------------ .../org/apache/camel/component/netty4/NettyHelper.java | 2 +- .../apache/camel/component/netty4/NettyProducer.java | 13 ++++++++++--- .../netty4/handlers/ServerResponseFutureListener.java | 5 +++++ 5 files changed, 21 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/64389b1e/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java index dc1c8d3..ceb64c4 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java @@ -45,6 +45,7 @@ import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConverter; +import org.apache.camel.component.netty4.NettyConstants; import org.apache.camel.component.netty4.NettyConverter; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.ExchangeHelper; @@ -418,6 +419,10 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } } response.headers().set(HttpHeaders.Names.CONNECTION, connection); + // Just make sure we close the channel when the connection value is close + if (connection.equalsIgnoreCase(HttpHeaders.Values.CLOSE)) { + message.setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true); + } LOG.trace("Connection: {}", connection); return response; http://git-wip-us.apache.org/repos/asf/camel/blob/64389b1e/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java index eb33c4c..7e72a86 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java @@ -295,18 +295,6 @@ public class HttpServerChannelHandler extends ServerChannelHandler { } @Override - protected ChannelFutureListener createResponseFutureListener(NettyConsumer consumer, Exchange exchange, SocketAddress remoteAddress) { - // make sure to close channel if not keep-alive - if (request != null && isKeepAlive(request)) { - LOG.trace("Request has Connection: keep-alive so Channel is not being closed"); - return null; - } else { - LOG.trace("Request is not Connection: close so Channel is being closed"); - return ChannelFutureListener.CLOSE; - } - } - - @Override protected Object getResponseBody(Exchange exchange) throws Exception { // use the binding if (exchange.hasOut()) { http://git-wip-us.apache.org/repos/asf/camel/blob/64389b1e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java index b345cca..5a25928 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java @@ -118,7 +118,7 @@ public final class NettyHelper { public static void close(Channel channel) { if (channel != null) { LOG.trace("Closing channel: {}", channel); - channel.close(); + channel.close().syncUninterruptibly(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/64389b1e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java index 7f54990..11f8b55 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java @@ -464,8 +464,11 @@ public class NettyProducer extends DefaultAsyncProducer { public void done(boolean doneSync) { // put back in pool try { - LOG.trace("Putting channel back to pool {}", channel); - pool.returnObject(channel); + // Only put the connected channel back to the pool + if (channel.isActive()) { + LOG.trace("Putting channel back to pool {}", channel); + pool.returnObject(channel); + } } catch (Exception e) { LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel); } finally { @@ -491,7 +494,9 @@ public class NettyProducer extends DefaultAsyncProducer { @Override public void destroyObject(Channel channel) throws Exception { LOG.trace("Destroying channel: {}", channel); - NettyHelper.close(channel); + if (channel.isOpen()) { + NettyHelper.close(channel); + } allChannels.remove(channel); } @@ -506,11 +511,13 @@ public class NettyProducer extends DefaultAsyncProducer { @Override public void activateObject(Channel channel) throws Exception { // noop + LOG.trace("activateObject channel: {} -> {}", channel); } @Override public void passivateObject(Channel channel) throws Exception { // noop + LOG.trace("passivateObject channel: {} -> {}", channel); } } http://git-wip-us.apache.org/repos/asf/camel/blob/64389b1e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java index 759b69d..3d33e62 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java @@ -62,6 +62,11 @@ public class ServerResponseFutureListener implements ChannelFutureListener { } else { close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); } + + // check the setting on the exchange property + if (close == null) { + close = exchange.getProperty(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } // should we disconnect, the header can override the configuration boolean disconnect = consumer.getConfiguration().isDisconnect();