camel-netty-http: Added support for chunked transfer encoding in netty http 
client.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/328e6e3f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/328e6e3f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/328e6e3f

Branch: refs/heads/master
Commit: 328e6e3f91fcdbbf880f2b60fc3ac5c84afed61b
Parents: 01957e8
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Aug 6 11:59:27 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Aug 6 11:59:27 2013 +0200

----------------------------------------------------------------------
 .../netty/http/DefaultNettyHttpBinding.java     |  1 -
 .../http/handlers/HttpClientChannelHandler.java | 58 +++++++++++++++++--
 .../netty/http/NettyHttpClientChunkedTest.java  | 46 +++++++++++++++
 tests/camel-itest/pom.xml                       |  5 ++
 .../NettyHttpClientChunkedResponseTest.java     | 60 ++++++++++++++++++++
 .../src/test/resources/log4j.properties         |  3 +-
 6 files changed, 167 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index f739f81..e64b800 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -21,7 +21,6 @@ import java.io.ObjectOutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.URI;
-import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
index cdee347..64c398c 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
@@ -16,12 +16,18 @@
  */
 package org.apache.camel.component.netty.http.handlers;
 
+import java.util.Map;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
 import org.apache.camel.component.netty.http.NettyHttpProducer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
 import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +43,7 @@ public class HttpClientChannelHandler extends 
ClientChannelHandler {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(NettyHttpProducer.class);
     private final NettyHttpProducer producer;
     private HttpResponse response;
+    private ChannelBuffer buffer;
 
     public HttpClientChannelHandler(NettyHttpProducer producer) {
         super(producer);
@@ -47,13 +54,56 @@ public class HttpClientChannelHandler extends 
ClientChannelHandler {
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent 
messageEvent) throws Exception {
         // store response, as this channel handler is created per pipeline
         Object msg = messageEvent.getMessage();
-        if (msg instanceof HttpResponse) {
+
+        // it may be a chunked message
+        if (msg instanceof HttpChunk) {
+            HttpChunk chunk = (HttpChunk) msg;
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("HttpChunk received: {} isLast: {}", chunk, 
chunk.isLast());
+            }
+
+            if (msg instanceof HttpChunkTrailer) {
+                // chunk trailer only has headers
+                HttpChunkTrailer trailer = (HttpChunkTrailer) msg;
+                for (Map.Entry<String, String> entry : trailer.getHeaders()) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Adding trailing header {}={}", 
entry.getKey(), entry.getValue());
+                    }
+                    response.addHeader(entry.getKey(), entry.getValue());
+                }
+            } else {
+                // append chunked content
+                buffer.writeBytes(chunk.getContent());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Wrote {} bytes to chunk buffer", 
buffer.writerIndex());
+                }
+            }
+            if (chunk.isLast()) {
+                // the content is a copy of the buffer with the actual data we 
wrote to it
+                int end = buffer.writerIndex();
+                ChannelBuffer copy = buffer.copy(0, end);
+                // the copy must not be readable when the content was chunked, 
so set the index to the end
+                copy.setIndex(end, end);
+                response.setContent(copy);
+                // we the all the content now, so call super to process the 
received message
+                super.messageReceived(ctx, messageEvent);
+            }
+        } else if (msg instanceof HttpResponse) {
             response = (HttpResponse) msg;
-            super.messageReceived(ctx, messageEvent);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("HttpResponse received: {} chunked:", response, 
response.isChunked());
+            }
+            if (!response.isChunked()) {
+                // the response is not chunked so we have all the content
+                super.messageReceived(ctx, messageEvent);
+            } else {
+                // the response is chunkced so use a dynamic buffer to receive 
the content in chunks
+                buffer = ChannelBuffers.dynamicBuffer();
+            }
         } else {
             // ignore not supported message
-            if (msg != null) {
-                LOG.trace("Ignoring non HttpResponse message of type {} -> 
{}", msg.getClass(), msg);
+            if (LOG.isTraceEnabled() && msg != null) {
+                LOG.trace("Ignoring non supported response message of type {} 
-> {}", msg.getClass(), msg);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpClientChunkedTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpClientChunkedTest.java
 
b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpClientChunkedTest.java
new file mode 100644
index 0000000..5868c06
--- /dev/null
+++ 
b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpClientChunkedTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class NettyHttpClientChunkedTest extends BaseNettyTest {
+
+    @Test
+    public void testHttpSimple() throws Exception {
+        getMockEndpoint("mock:input").expectedBodiesReceived("Hello World");
+
+        String out = 
template.requestBody("netty-http:http://localhost:{{port}}/foo";, "Hello World", 
String.class);
+        assertEquals("Bye World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty-http:http://0.0.0.0:{{port}}/foo";)
+                    .to("mock:input")
+                    .transform().constant("Bye World");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/tests/camel-itest/pom.xml
----------------------------------------------------------------------
diff --git a/tests/camel-itest/pom.xml b/tests/camel-itest/pom.xml
index 47d2748..7109c7a 100644
--- a/tests/camel-itest/pom.xml
+++ b/tests/camel-itest/pom.xml
@@ -109,6 +109,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-netty-http</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-quartz</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/tests/camel-itest/src/test/java/org/apache/camel/itest/nettyhttp/NettyHttpClientChunkedResponseTest.java
----------------------------------------------------------------------
diff --git 
a/tests/camel-itest/src/test/java/org/apache/camel/itest/nettyhttp/NettyHttpClientChunkedResponseTest.java
 
b/tests/camel-itest/src/test/java/org/apache/camel/itest/nettyhttp/NettyHttpClientChunkedResponseTest.java
new file mode 100644
index 0000000..88cd3be
--- /dev/null
+++ 
b/tests/camel-itest/src/test/java/org/apache/camel/itest/nettyhttp/NettyHttpClientChunkedResponseTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.nettyhttp;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class NettyHttpClientChunkedResponseTest extends CamelTestSupport {
+
+    private int port;
+
+    @Test
+    public void testNettyHttpClientChunked() throws Exception {
+        Exchange out = template.request("netty-http:http://localhost:"; + port 
+ "/test", new Processor() {
+//        Exchange out = template.request("jetty:http://localhost:"; + port + 
"/test", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Camel in chunks.");
+            }
+        });
+
+        assertNotNull(out);
+
+        assertEquals("Bye Camel in chunks.", 
out.getOut().getBody(String.class));
+        assertEquals("chunked", out.getOut().getHeader("Transfer-Encoding"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                port = AvailablePortFinder.getNextAvailable(8000);
+
+                // use jetty as server as it supports sending response as 
chunked encoding
+                from("jetty:http://localhost:"; + port + "/test")
+                    .setHeader("Transfer-Encoding", constant("chunked"))
+                    .transform().simple("Bye ${body}");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/328e6e3f/tests/camel-itest/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/resources/log4j.properties 
b/tests/camel-itest/src/test/resources/log4j.properties
index 336a6cf..c8b19e6 100644
--- a/tests/camel-itest/src/test/resources/log4j.properties
+++ b/tests/camel-itest/src/test/resources/log4j.properties
@@ -16,7 +16,7 @@
 ## ------------------------------------------------------------------------
 
 #
-# The logging properties used for eclipse testing, We want to see debug output 
on the console.
+# The logging properties used for testing.
 #
 log4j.rootLogger=INFO, file
 
@@ -26,6 +26,7 @@ log4j.rootLogger=INFO, file
 #log4j.logger.org.apache.camel=DEBUG
 #log4j.logger.org.apache.camel.component.file=TRACE
 #log4j.logger.org.apache.camel.component.netty=DEBUG
+#log4j.logger.org.apache.camel.component.netty.http.NettyHttpProducer=TRACE
 #log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE
 #log4j.logger.org.apache.camel.component.jetty=TRACE
 #log4j.logger.org.apache.camel.processor.Pipeline=TRACE

Reply via email to