This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 11cc3b7c7f8643a5e60abee2b72364713633e986
Author: Amos Feng <zf...@redhat.com>
AuthorDate: Fri Jul 12 11:27:16 2019 +0800

    CAMEL-12983 - Camel-netty4-http: update to add the cheunked http 
request/response and support stream writing
---
 .../component/netty4/http/ChunkedHttpRequest.java  | 75 +++++++++++++++++++
 .../component/netty4/http/ChunkedHttpResponse.java | 75 +++++++++++++++++++
 .../netty4/http/CustomChunkedWriteHandler.java     | 37 +++++++++
 .../netty4/http/DefaultNettyHttpBinding.java       | 87 +++++++++++++---------
 .../netty4/http/HttpClientInitializerFactory.java  |  1 +
 .../netty4/http/HttpServerInitializerFactory.java  |  1 +
 .../http/HttpServerSharedInitializerFactory.java   |  1 +
 7 files changed, 243 insertions(+), 34 deletions(-)

diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpRequest.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpRequest.java
new file mode 100644
index 0000000..58f5c30
--- /dev/null
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpRequest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.http;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultHttpRequest;
+import io.netty.handler.codec.http.HttpChunkedInput;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.stream.ChunkedInput;
+import io.netty.handler.stream.ChunkedStream;
+
+import java.io.InputStream;
+
+/**
+ * @author <a href="mailto:zf...@redhat.com";>Zheng Feng</a>
+ */
+public class ChunkedHttpRequest extends DefaultHttpRequest implements 
ChunkedInput<HttpContent> {
+    private HttpChunkedInput input;
+
+    public ChunkedHttpRequest(InputStream in, DefaultHttpRequest request) {
+        super(request.protocolVersion(), request.method(), request.uri());
+        this.input = new HttpChunkedInput(new ChunkedStream(in));
+    }
+
+    public DefaultHttpRequest getRequest() {
+        return new DefaultHttpRequest(this.protocolVersion(), this.method(), 
this.uri(), this.headers());
+    }
+
+    @Override
+    public boolean isEndOfInput() throws Exception {
+        return input.isEndOfInput();
+    }
+
+    @Override
+    public void close() throws Exception {
+        input.close();
+    }
+
+    @Override
+    @Deprecated
+    public HttpContent readChunk(ChannelHandlerContext channelHandlerContext) 
throws Exception {
+        return input.readChunk(channelHandlerContext);
+    }
+
+    @Override
+    public HttpContent readChunk(ByteBufAllocator byteBufAllocator) throws 
Exception {
+        return input.readChunk(byteBufAllocator);
+    }
+
+    @Override
+    public long length() {
+        return input.length();
+    }
+
+    @Override
+    public long progress() {
+        return input.progress();
+    }
+}
+
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpResponse.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpResponse.java
new file mode 100644
index 0000000..779d7d7
--- /dev/null
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.netty4.http;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.stream.ChunkedInput;
+import io.netty.handler.stream.ChunkedStream;
+
+import java.io.InputStream;
+
+/**
+ * @author <a href="mailto:zf...@redhat.com";>Zheng Feng</a>
+ */
+public class ChunkedHttpResponse extends DefaultHttpResponse implements 
ChunkedInput<HttpContent> {
+    private HttpChunkedInput input;
+
+    public ChunkedHttpResponse(InputStream in, DefaultHttpResponse response) {
+        super(response.protocolVersion(), response.status());
+        this.input = new HttpChunkedInput(new ChunkedStream(in));
+    }
+
+    public DefaultHttpResponse getResponse() {
+        return new DefaultHttpResponse(this.protocolVersion(), this.status(), 
this.headers());
+    }
+
+    @Override
+    public boolean isEndOfInput() throws Exception {
+        return input.isEndOfInput();
+    }
+
+    @Override
+    public void close() throws Exception {
+        input.close();
+    }
+
+    @Override
+    @Deprecated
+    public HttpContent readChunk(ChannelHandlerContext channelHandlerContext) 
throws Exception {
+        return input.readChunk(channelHandlerContext);
+    }
+
+    @Override
+    public HttpContent readChunk(ByteBufAllocator byteBufAllocator) throws 
Exception {
+        return input.readChunk(byteBufAllocator);
+    }
+
+    @Override
+    public long length() {
+        return input.length();
+    }
+
+    @Override
+    public long progress() {
+        return input.progress();
+    }
+}
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/CustomChunkedWriteHandler.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/CustomChunkedWriteHandler.java
new file mode 100644
index 0000000..c009ba7
--- /dev/null
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/CustomChunkedWriteHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.http;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * @author <a href="mailto:zf...@redhat.com";>Zheng Feng</a>
+ */
+public class CustomChunkedWriteHandler extends ChunkedWriteHandler {
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
+        if (msg instanceof ChunkedHttpRequest) {
+            super.write(ctx, ((ChunkedHttpRequest) msg).getRequest(), promise);
+        } else if (msg instanceof ChunkedHttpResponse) {
+            super.write(ctx, ((ChunkedHttpResponse)msg).getResponse(), 
promise);
+        }
+        super.write(ctx, msg, promise);
+    }
+
+}
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index ee1e718..e29d8eb 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty4.http;
 
 import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -32,6 +33,8 @@ import java.util.Map;
 import io.netty.buffer.ByteBuf;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpRequest;
+import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaderNames;
@@ -58,6 +61,9 @@ import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
+import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
+
 /**
  * Default {@link NettyHttpBinding}.
  */
@@ -377,44 +383,51 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             ExchangeHelper.setFailureHandled(message.getExchange());
         }
 
-        if (body instanceof ByteBuf) {
-            buffer = (ByteBuf) body;
-        } else {
-            // try to convert to buffer first
-            buffer = message.getBody(ByteBuf.class);
-            if (buffer == null) {
-                // fallback to byte array as last resort
-                byte[] data = message.getBody(byte[].class);
-                if (data != null) {
-                    buffer = NettyConverter.toByteBuffer(data);
-                } else {
-                    // and if byte array fails then try String
-                    String str;
-                    if (body != null) {
-                        str = message.getMandatoryBody(String.class);
+        HttpResponse response = null;
+
+        if (response == null && body instanceof InputStream && 
configuration.isDisableStreamCache()) {
+            response = new ChunkedHttpResponse((InputStream)body, new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code)));
+            response.headers().set(TRANSFER_ENCODING, CHUNKED);
+        }
+
+        if (response == null) {
+            if (body instanceof ByteBuf) {
+                buffer = (ByteBuf) body;
+            } else {
+                // try to convert to buffer first
+                buffer = message.getBody(ByteBuf.class);
+                if (buffer == null) {
+                    // fallback to byte array as last resort
+                    byte[] data = message.getBody(byte[].class);
+                    if (data != null) {
+                        buffer = NettyConverter.toByteBuffer(data);
                     } else {
-                        str = "";
+                        // and if byte array fails then try String
+                        String str;
+                        if (body != null) {
+                            str = message.getMandatoryBody(String.class);
+                        } else {
+                            str = "";
+                        }
+                        buffer = NettyConverter.toByteBuffer(str.getBytes());
                     }
-                    buffer = NettyConverter.toByteBuffer(str.getBytes());
                 }
             }
-        }
-
-        HttpResponse response;
 
-        if (buffer != null) {
-            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.valueOf(code), buffer);
-            // We just need to reset the readerIndex this time
-            if (buffer.readerIndex() == buffer.writerIndex()) {
-                buffer.setIndex(0, buffer.writerIndex());
+            if (buffer != null) {
+                response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.valueOf(code), buffer);
+                // We just need to reset the readerIndex this time
+                if (buffer.readerIndex() == buffer.writerIndex()) {
+                    buffer.setIndex(0, buffer.writerIndex());
+                }
+                // TODO How to enable the chunk transport
+                int len = buffer.readableBytes();
+                // set content-length
+                
response.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len);
+                LOG.trace("Content-Length: {}", len);
+            } else {
+                response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.valueOf(code));
             }
-            // TODO How to enable the chunk transport
-            int len = buffer.readableBytes();
-            // set content-length
-            response.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), 
len);
-            LOG.trace("Content-Length: {}", len);
-        } else {
-            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.valueOf(code));
         }
 
         TypeConverter tc = 
message.getExchange().getContext().getTypeConverter();
@@ -517,7 +530,7 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             proxyRequest = null;
         }
 
-        FullHttpRequest request = null;
+        HttpRequest request = null;
         if (message instanceof NettyHttpMessage) {
             // if the request is already given we should set the values
             // from message headers and pass on the same request
@@ -532,6 +545,11 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             }
         }
 
+        if (request == null && body instanceof InputStream && 
configuration.isDisableStreamCache()) {
+            request = new ChunkedHttpRequest((InputStream)body, new 
DefaultHttpRequest(protocol, httpMethod, uriForRequest));
+            request.headers().set(TRANSFER_ENCODING, CHUNKED);
+        }
+
         if (request == null) {
             request = new DefaultFullHttpRequest(protocol, httpMethod, 
uriForRequest);
 
@@ -552,10 +570,11 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
                         }
                     }
                 }
+
     
                 if (buffer != null) {
                     if (buffer.readableBytes() > 0) {
-                        request = request.replace(buffer);
+                        request = 
((DefaultFullHttpRequest)request).replace(buffer);
                         int len = buffer.readableBytes();
                         // set content-length
                         
request.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len);
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java
index b8a0cbf..81c8b73 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java
@@ -107,6 +107,7 @@ public class HttpClientInitializerFactory extends 
ClientInitializerFactory {
             pipeline.addLast("decoder-" + x, decoder);
         }
         pipeline.addLast("aggregator", new 
HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
+        pipeline.addLast("streamer", new CustomChunkedWriteHandler());
 
         if (producer.getConfiguration().getRequestTimeout() > 0) {
             if (LOG.isTraceEnabled()) {
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java
index 2549cb0..2659c84 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java
@@ -108,6 +108,7 @@ public class HttpServerInitializerFactory extends 
ServerInitializerFactory {
             pipeline.addLast("encoder-" + x, encoder);
         }
         pipeline.addLast("aggregator", new 
HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
+        pipeline.addLast("streamer", new CustomChunkedWriteHandler());
         if (supportCompressed()) {
             pipeline.addLast("deflater", new HttpContentCompressor());
         }
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java
index ac62a6d..e5c50b5 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java
@@ -85,6 +85,7 @@ public class HttpServerSharedInitializerFactory extends 
HttpServerInitializerFac
         pipeline.addLast("encoder", new HttpResponseEncoder());
         if (configuration.isChunked()) {
             pipeline.addLast("aggregator", new 
HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
+            pipeline.addLast("streamer", new CustomChunkedWriteHandler());
         }
         if (configuration.isCompression()) {
             pipeline.addLast("deflater", new HttpContentCompressor());

Reply via email to