CAMEL-8461 Fixed the issue that camel-netty-http does not respect client's keep-alive setting
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48653d33 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48653d33 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48653d33 Branch: refs/heads/camel-2.14.x Commit: 48653d33f3ace8b509624598f02743ebbdc60704 Parents: 92182c3 Author: Willem Jiang <[email protected]> Authored: Mon Mar 9 18:07:53 2015 +0800 Committer: Willem Jiang <[email protected]> Committed: Mon Mar 9 18:38:15 2015 +0800 ---------------------------------------------------------------------- .../netty/http/DefaultNettyHttpBinding.java | 5 +++- .../http/handlers/HttpServerChannelHandler.java | 13 +++++++++-- .../http/NettyHttpProducerKeepAliveTest.java | 24 +++++++++++++++++--- 3 files changed, 36 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/48653d33/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index 102fb91..85395b8 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -403,6 +403,10 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { // configure connection to accordingly to keep alive configuration // favor using the header from the message String connection = message.getHeader(HttpHeaders.Names.CONNECTION, String.class); + // Read the connection header from the exchange property + if (connection == null) { + connection = message.getExchange().getProperty(HttpHeaders.Names.CONNECTION, String.class); + } if (connection == null) { // fallback and use the keep alive from the configuration if (configuration.isKeepAlive()) { @@ -531,7 +535,6 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } request.headers().set(HttpHeaders.Names.CONNECTION, connection); LOG.trace("Connection: {}", connection); - return request; } http://git-wip-us.apache.org/repos/asf/camel/blob/48653d33/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java index e30ee27..4888b8c 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java @@ -44,9 +44,9 @@ import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; @@ -209,9 +209,11 @@ public class HttpServerChannelHandler extends ServerChannelHandler { } } } - + // let Camel process this message + // It did the way as camel-netty component does super.messageReceived(ctx, messageEvent); + } protected boolean matchesRoles(String roles, String userRoles) { @@ -286,6 +288,13 @@ public class HttpServerChannelHandler extends ServerChannelHandler { exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE); exchange.setProperty(Exchange.SKIP_WWW_FORM_URLENCODED, Boolean.TRUE); } + HttpRequest request = (HttpRequest) messageEvent.getMessage(); + // setup the connection property in case of the message header is removed + boolean keepAlive = HttpHeaders.isKeepAlive(request); + if (!keepAlive) { + // Just make sure we close the connection this time. + exchange.setProperty(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/48653d33/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpProducerKeepAliveTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpProducerKeepAliveTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpProducerKeepAliveTest.java index 9d1dd95..71cbb43 100644 --- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpProducerKeepAliveTest.java +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpProducerKeepAliveTest.java @@ -16,8 +16,10 @@ */ package org.apache.camel.component.netty.http; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; -import org.junit.Ignore; +import org.jboss.netty.handler.codec.http.HttpHeaders; import org.junit.Test; public class NettyHttpProducerKeepAliveTest extends BaseNettyTest { @@ -36,7 +38,6 @@ public class NettyHttpProducerKeepAliveTest extends BaseNettyTest { } @Test - @Ignore("Can fail on some CI servers") public void testHttpKeepAliveFalse() throws Exception { getMockEndpoint("mock:input").expectedBodiesReceived("Hello World", "Hello Again"); @@ -48,15 +49,32 @@ public class NettyHttpProducerKeepAliveTest extends BaseNettyTest { assertMockEndpointsSatisfied(); } + + @Test + public void testConnectionClosed() throws Exception { + getMockEndpoint("mock:input").expectedBodiesReceived("Hello World"); + Exchange ex = template.request("netty-http:http://localhost:{{port}}/bar?keepAlive=false", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + } + + }); + assertMockEndpointsSatisfied(); + assertEquals(HttpHeaders.Values.CLOSE, ex.getOut().getHeader(HttpHeaders.Names.CONNECTION)); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { - from("netty-http:http://0.0.0.0:{{port}}/foo") + from("netty-http:http://localhost:{{port}}/foo") .to("mock:input") .transform().constant("Bye World"); + + from("netty-http:http://localhost:{{port}}/bar").removeHeaders("*").to("mock:input").transform().constant("Bye World"); } }; }
