camel-netty-http: Added support for chunked transfer encoding in netty http client.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/328e6e3f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/328e6e3f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/328e6e3f Branch: refs/heads/master Commit: 328e6e3f91fcdbbf880f2b60fc3ac5c84afed61b Parents: 01957e8 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Aug 6 11:59:27 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 6 11:59:27 2013 +0200 ---------------------------------------------------------------------- .../netty/http/DefaultNettyHttpBinding.java | 1 - .../http/handlers/HttpClientChannelHandler.java | 58 +++++++++++++++++-- .../netty/http/NettyHttpClientChunkedTest.java | 46 +++++++++++++++ tests/camel-itest/pom.xml | 5 ++ .../NettyHttpClientChunkedResponseTest.java | 60 ++++++++++++++++++++ .../src/test/resources/log4j.properties | 3 +- 6 files changed, 167 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index f739f81..e64b800 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -21,7 +21,6 @@ import java.io.ObjectOutputStream; import java.io.PrintWriter; import java.io.StringWriter; import java.net.URI; -import java.net.URL; import java.net.URLDecoder; import java.nio.charset.Charset; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java index cdee347..64c398c 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java @@ -16,12 +16,18 @@ */ package org.apache.camel.component.netty.http.handlers; +import java.util.Map; + import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.component.netty.handlers.ClientChannelHandler; import org.apache.camel.component.netty.http.NettyHttpProducer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpChunkTrailer; import org.jboss.netty.handler.codec.http.HttpResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +43,7 @@ public class HttpClientChannelHandler extends ClientChannelHandler { private static final transient Logger LOG = LoggerFactory.getLogger(NettyHttpProducer.class); private final NettyHttpProducer producer; private HttpResponse response; + private ChannelBuffer buffer; public HttpClientChannelHandler(NettyHttpProducer producer) { super(producer); @@ -47,13 +54,56 @@ public class HttpClientChannelHandler extends ClientChannelHandler { public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { // store response, as this channel handler is created per pipeline Object msg = messageEvent.getMessage(); - if (msg instanceof HttpResponse) { + + // it may be a chunked message + if (msg instanceof HttpChunk) { + HttpChunk chunk = (HttpChunk) msg; + if (LOG.isTraceEnabled()) { + LOG.trace("HttpChunk received: {} isLast: {}", chunk, chunk.isLast()); + } + + if (msg instanceof HttpChunkTrailer) { + // chunk trailer only has headers + HttpChunkTrailer trailer = (HttpChunkTrailer) msg; + for (Map.Entry<String, String> entry : trailer.getHeaders()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Adding trailing header {}={}", entry.getKey(), entry.getValue()); + } + response.addHeader(entry.getKey(), entry.getValue()); + } + } else { + // append chunked content + buffer.writeBytes(chunk.getContent()); + if (LOG.isTraceEnabled()) { + LOG.trace("Wrote {} bytes to chunk buffer", buffer.writerIndex()); + } + } + if (chunk.isLast()) { + // the content is a copy of the buffer with the actual data we wrote to it + int end = buffer.writerIndex(); + ChannelBuffer copy = buffer.copy(0, end); + // the copy must not be readable when the content was chunked, so set the index to the end + copy.setIndex(end, end); + response.setContent(copy); + // we the all the content now, so call super to process the received message + super.messageReceived(ctx, messageEvent); + } + } else if (msg instanceof HttpResponse) { response = (HttpResponse) msg; - super.messageReceived(ctx, messageEvent); + if (LOG.isTraceEnabled()) { + LOG.trace("HttpResponse received: {} chunked:", response, response.isChunked()); + } + if (!response.isChunked()) { + // the response is not chunked so we have all the content + super.messageReceived(ctx, messageEvent); + } else { + // the response is chunkced so use a dynamic buffer to receive the content in chunks + buffer = ChannelBuffers.dynamicBuffer(); + } } else { // ignore not supported message - if (msg != null) { - LOG.trace("Ignoring non HttpResponse message of type {} -> {}", msg.getClass(), msg); + if (LOG.isTraceEnabled() && msg != null) { + LOG.trace("Ignoring non supported response message of type {} -> {}", msg.getClass(), msg); } } http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpClientChunkedTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpClientChunkedTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpClientChunkedTest.java new file mode 100644 index 0000000..5868c06 --- /dev/null +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpClientChunkedTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty.http; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class NettyHttpClientChunkedTest extends BaseNettyTest { + + @Test + public void testHttpSimple() throws Exception { + getMockEndpoint("mock:input").expectedBodiesReceived("Hello World"); + + String out = template.requestBody("netty-http:http://localhost:{{port}}/foo", "Hello World", String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty-http:http://0.0.0.0:{{port}}/foo") + .to("mock:input") + .transform().constant("Bye World"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/tests/camel-itest/pom.xml ---------------------------------------------------------------------- diff --git a/tests/camel-itest/pom.xml b/tests/camel-itest/pom.xml index 47d2748..7109c7a 100644 --- a/tests/camel-itest/pom.xml +++ b/tests/camel-itest/pom.xml @@ -109,6 +109,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-netty-http</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-quartz</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/tests/camel-itest/src/test/java/org/apache/camel/itest/nettyhttp/NettyHttpClientChunkedResponseTest.java ---------------------------------------------------------------------- diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/nettyhttp/NettyHttpClientChunkedResponseTest.java b/tests/camel-itest/src/test/java/org/apache/camel/itest/nettyhttp/NettyHttpClientChunkedResponseTest.java new file mode 100644 index 0000000..88cd3be --- /dev/null +++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/nettyhttp/NettyHttpClientChunkedResponseTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.itest.nettyhttp; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class NettyHttpClientChunkedResponseTest extends CamelTestSupport { + + private int port; + + @Test + public void testNettyHttpClientChunked() throws Exception { + Exchange out = template.request("netty-http:http://localhost:" + port + "/test", new Processor() { +// Exchange out = template.request("jetty:http://localhost:" + port + "/test", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Camel in chunks."); + } + }); + + assertNotNull(out); + + assertEquals("Bye Camel in chunks.", out.getOut().getBody(String.class)); + assertEquals("chunked", out.getOut().getHeader("Transfer-Encoding")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + port = AvailablePortFinder.getNextAvailable(8000); + + // use jetty as server as it supports sending response as chunked encoding + from("jetty:http://localhost:" + port + "/test") + .setHeader("Transfer-Encoding", constant("chunked")) + .transform().simple("Bye ${body}"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/tests/camel-itest/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/tests/camel-itest/src/test/resources/log4j.properties b/tests/camel-itest/src/test/resources/log4j.properties index 336a6cf..c8b19e6 100644 --- a/tests/camel-itest/src/test/resources/log4j.properties +++ b/tests/camel-itest/src/test/resources/log4j.properties @@ -16,7 +16,7 @@ ## ------------------------------------------------------------------------ # -# The logging properties used for eclipse testing, We want to see debug output on the console. +# The logging properties used for testing. # log4j.rootLogger=INFO, file @@ -26,6 +26,7 @@ log4j.rootLogger=INFO, file #log4j.logger.org.apache.camel=DEBUG #log4j.logger.org.apache.camel.component.file=TRACE #log4j.logger.org.apache.camel.component.netty=DEBUG +#log4j.logger.org.apache.camel.component.netty.http.NettyHttpProducer=TRACE #log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE #log4j.logger.org.apache.camel.component.jetty=TRACE #log4j.logger.org.apache.camel.processor.Pipeline=TRACE