This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new fbed4bd  CAMEL-14017 - Update to support the full streaming on both 
inbound and outbound (#3209)
fbed4bd is described below

commit fbed4bda80840cb910d312388d0bf855141b1e55
Author: Amos Feng <zf...@redhat.com>
AuthorDate: Mon Sep 30 11:41:02 2019 +0800

    CAMEL-14017 - Update to support the full streaming on both inbound and 
outbound (#3209)
    
    - Add the HttpInboundStreamHandler
      - Refactor the CustomWriterHandler to HttpOutboundStreamHandler
      - fix an issue in the writer handler to create a new promise after 
writing the request/response headers
      - create the NettyHttpStreamTest
---
 .../netty/http/DefaultNettyHttpBinding.java        |  48 +++++--
 .../netty/http/HttpClientInitializerFactory.java   |   7 +-
 .../netty/http/HttpServerInitializerFactory.java   |   7 +-
 .../http/HttpServerSharedInitializerFactory.java   |   5 +-
 ...eHandler.java => InboundStreamHttpRequest.java} |  30 ++--
 ...Handler.java => InboundStreamHttpResponse.java} |  30 ++--
 .../component/netty/http/NettyHttpBinding.java     |  30 +++-
 .../component/netty/http/NettyHttpEndpoint.java    |  10 +-
 ...Request.java => OutboundStreamHttpRequest.java} |   5 +-
 ...sponse.java => OutboundStreamHttpResponse.java} |   4 +-
 .../component/netty/http/RestNettyHttpBinding.java |   4 +-
 .../http/handlers/HttpClientChannelHandler.java    |  21 ++-
 .../http/handlers/HttpInboundStreamHandler.java    |  97 +++++++++++++
 .../HttpOutboundStreamHandler.java}                |  25 +++-
 .../http/handlers/HttpServerChannelHandler.java    |  16 ++-
 .../HttpServerMultiplexChannelHandler.java         |  10 +-
 .../component/netty/http/NettyHttpStreamTest.java  | 156 +++++++++++++++++++++
 17 files changed, 442 insertions(+), 63 deletions(-)

diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index c4d7601..e5bea4a 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -113,7 +113,21 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
     }
 
     @Override
-    public void populateCamelHeaders(FullHttpRequest request, Map<String, 
Object> headers, Exchange exchange, NettyHttpConfiguration configuration) 
throws Exception {
+    public Message toCamelMessage(InboundStreamHttpRequest request, Exchange 
exchange, NettyHttpConfiguration configuration) throws Exception {
+        LOG.trace("toCamelMessage: {}", request);
+
+        NettyHttpMessage answer = new NettyHttpMessage(exchange.getContext(), 
null, null);
+        answer.setExchange(exchange);
+        if (configuration.isMapHeaders()) {
+            populateCamelHeaders(request.getHttpRequest(), 
answer.getHeaders(), exchange, configuration);
+        }
+
+        answer.setBody(request.getInputStream());
+        return answer;
+    }
+
+    @Override
+    public void populateCamelHeaders(HttpRequest request, Map<String, Object> 
headers, Exchange exchange, NettyHttpConfiguration configuration) throws 
Exception {
         LOG.trace("populateCamelHeaders: {}", request);
 
         // NOTE: these headers is applied using the same logic as 
camel-http/camel-jetty to be consistent
@@ -227,13 +241,13 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
         // if we're proxying the body is a buffer that we do not want to 
consume directly
         if (request.method().name().equals("POST") && 
request.headers().get(Exchange.CONTENT_TYPE) != null
                 && 
request.headers().get(Exchange.CONTENT_TYPE).startsWith(NettyHttpConstants.CONTENT_TYPE_WWW_FORM_URLENCODED)
-                && !configuration.isBridgeEndpoint() && 
!configuration.isHttpProxy()) {
+                && !configuration.isBridgeEndpoint() && 
!configuration.isHttpProxy() && request instanceof FullHttpRequest) {
 
             String charset = "UTF-8";
 
             // Push POST form params into the headers to retain compatibility 
with DefaultHttpBinding
             String body;
-            ByteBuf buffer = request.content();
+            ByteBuf buffer = ((FullHttpRequest)request).content();
             try {
                 body = buffer.toString(Charset.forName(charset));
             } finally {
@@ -306,7 +320,21 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
     }
 
     @Override
-    public void populateCamelHeaders(FullHttpResponse response, Map<String, 
Object> headers, Exchange exchange, NettyHttpConfiguration configuration) 
throws Exception {
+    public Message toCamelMessage(InboundStreamHttpResponse response, Exchange 
exchange, NettyHttpConfiguration configuration) throws Exception {
+        LOG.trace("toCamelMessage: {}", response);
+
+        NettyHttpMessage answer = new NettyHttpMessage(exchange.getContext(), 
null, null);
+        answer.setExchange(exchange);
+        if (configuration.isMapHeaders()) {
+            populateCamelHeaders(response.getHttpResponse(), 
answer.getHeaders(), exchange, configuration);
+        }
+
+        answer.setBody(response.getInputStream());
+        return answer;
+    }
+
+    @Override
+    public void populateCamelHeaders(HttpResponse response, Map<String, 
Object> headers, Exchange exchange, NettyHttpConfiguration configuration) 
throws Exception {
         LOG.trace("populateCamelHeaders: {}", response);
 
         headers.put(Exchange.HTTP_RESPONSE_CODE, response.status().code());
@@ -394,7 +422,7 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
         HttpResponse response = null;
 
         if (response == null && body instanceof InputStream && 
configuration.isDisableStreamCache()) {
-            response = new ChunkedHttpResponse((InputStream)body, new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code)));
+            response = new OutboundStreamHttpResponse((InputStream)body, new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code)));
             response.headers().set(TRANSFER_ENCODING, CHUNKED);
         }
 
@@ -547,7 +575,7 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
         }
 
         if (request == null && body instanceof InputStream && 
configuration.isDisableStreamCache()) {
-            request = new ChunkedHttpRequest((InputStream)body, new 
DefaultHttpRequest(protocol, httpMethod, uriForRequest));
+            request = new OutboundStreamHttpRequest((InputStream)body, new 
DefaultHttpRequest(protocol, httpMethod, uriForRequest));
             request.headers().set(TRANSFER_ENCODING, CHUNKED);
         }
 
@@ -572,7 +600,6 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
                     }
                 }
 
-
                 if (buffer != null) {
                     if (buffer.readableBytes() > 0) {
                         request = 
((DefaultFullHttpRequest)request).replace(buffer);
@@ -585,12 +612,11 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
                     }
                 }
             }
-
-            // update HTTP method accordingly as we know if we have a body or 
not
-            HttpMethod method = NettyHttpHelper.createMethod(message, body != 
null);
-            request.setMethod(method);
         }
 
+        // update HTTP method accordingly as we know if we have a body or not
+        HttpMethod method = NettyHttpHelper.createMethod(message, body != 
null);
+        request.setMethod(method);
 
         TypeConverter tc = 
message.getExchange().getContext().getTypeConverter();
 
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpClientInitializerFactory.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpClientInitializerFactory.java
index 94edeb5..43f6c95 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpClientInitializerFactory.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpClientInitializerFactory.java
@@ -35,6 +35,8 @@ import 
org.apache.camel.component.netty.ClientInitializerFactory;
 import org.apache.camel.component.netty.NettyConfiguration;
 import org.apache.camel.component.netty.NettyProducer;
 import org.apache.camel.component.netty.http.handlers.HttpClientChannelHandler;
+import org.apache.camel.component.netty.http.handlers.HttpInboundStreamHandler;
+import 
org.apache.camel.component.netty.http.handlers.HttpOutboundStreamHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,8 +108,11 @@ public class HttpClientInitializerFactory extends 
ClientInitializerFactory {
             }
             pipeline.addLast("decoder-" + x, decoder);
         }
+        if (configuration.isDisableStreamCache()) {
+            pipeline.addLast("inbound-streamer", new 
HttpInboundStreamHandler());
+        }
         pipeline.addLast("aggregator", new 
HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
-        pipeline.addLast("streamer", new CustomChunkedWriteHandler());
+        pipeline.addLast("outbound-streamer", new HttpOutboundStreamHandler());
 
         if (producer.getConfiguration().getRequestTimeout() > 0) {
             if (LOG.isTraceEnabled()) {
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerInitializerFactory.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerInitializerFactory.java
index 9f97fe4..25b94e7 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerInitializerFactory.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerInitializerFactory.java
@@ -37,6 +37,8 @@ import org.apache.camel.component.netty.ChannelHandlerFactory;
 import org.apache.camel.component.netty.NettyConsumer;
 import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
 import org.apache.camel.component.netty.ServerInitializerFactory;
+import org.apache.camel.component.netty.http.handlers.HttpInboundStreamHandler;
+import 
org.apache.camel.component.netty.http.handlers.HttpOutboundStreamHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,8 +109,11 @@ public class HttpServerInitializerFactory extends 
ServerInitializerFactory {
             }
             pipeline.addLast("encoder-" + x, encoder);
         }
+        if (configuration.isDisableStreamCache()) {
+            pipeline.addLast("inbound-streamer", new 
HttpInboundStreamHandler());
+        }
         pipeline.addLast("aggregator", new 
HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
-        pipeline.addLast("streamer", new CustomChunkedWriteHandler());
+        pipeline.addLast("outbound-streamer", new HttpOutboundStreamHandler());
         if (supportCompressed()) {
             pipeline.addLast("deflater", new HttpContentCompressor());
         }
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedInitializerFactory.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedInitializerFactory.java
index 5e9a328..66db3ed 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedInitializerFactory.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedInitializerFactory.java
@@ -30,6 +30,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.netty.NettyConsumer;
 import org.apache.camel.component.netty.ServerInitializerFactory;
+import org.apache.camel.component.netty.http.handlers.HttpInboundStreamHandler;
+import 
org.apache.camel.component.netty.http.handlers.HttpOutboundStreamHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,8 +86,9 @@ public class HttpServerSharedInitializerFactory extends 
HttpServerInitializerFac
         pipeline.addLast("decoder", new HttpRequestDecoder(4096, 
configuration.getMaxHeaderSize(), 8192));
         pipeline.addLast("encoder", new HttpResponseEncoder());
         if (configuration.isChunked()) {
+            pipeline.addLast("inbound-streamer", new 
HttpInboundStreamHandler());
             pipeline.addLast("aggregator", new 
HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
-            pipeline.addLast("streamer", new CustomChunkedWriteHandler());
+            pipeline.addLast("outbound-streamer", new 
HttpOutboundStreamHandler());
         }
         if (configuration.isCompression()) {
             pipeline.addLast("deflater", new HttpContentCompressor());
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpRequest.java
similarity index 57%
copy from 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
copy to 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpRequest.java
index e07fa2a..5ef80a2 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpRequest.java
@@ -14,21 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.component.netty.http;
 
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.stream.ChunkedWriteHandler;
+import java.io.InputStream;
+
+import io.netty.handler.codec.http.HttpRequest;
+
+public class InboundStreamHttpRequest {
+    private HttpRequest request;
+    private InputStream in;
 
-public class CustomChunkedWriteHandler extends ChunkedWriteHandler {
-    @Override
-    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
-        if (msg instanceof ChunkedHttpRequest) {
-            super.write(ctx, ((ChunkedHttpRequest) msg).getRequest(), promise);
-        } else if (msg instanceof ChunkedHttpResponse) {
-            super.write(ctx, ((ChunkedHttpResponse)msg).getResponse(), 
promise);
-        }
-        super.write(ctx, msg, promise);
+    public InboundStreamHttpRequest(HttpRequest request, InputStream in) {
+        this.request = request;
+        this.in = in;
     }
 
+    public InputStream getInputStream() {
+        return in;
+    }
+
+    public HttpRequest getHttpRequest() {
+        return request;
+    }
 }
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpResponse.java
similarity index 57%
copy from 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
copy to 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpResponse.java
index e07fa2a..476f59c 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpResponse.java
@@ -14,21 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.component.netty.http;
 
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.stream.ChunkedWriteHandler;
+import java.io.InputStream;
+
+import io.netty.handler.codec.http.HttpResponse;
+
+public class InboundStreamHttpResponse {
+    private HttpResponse response;
+    private InputStream in;
 
-public class CustomChunkedWriteHandler extends ChunkedWriteHandler {
-    @Override
-    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
-        if (msg instanceof ChunkedHttpRequest) {
-            super.write(ctx, ((ChunkedHttpRequest) msg).getRequest(), promise);
-        } else if (msg instanceof ChunkedHttpResponse) {
-            super.write(ctx, ((ChunkedHttpResponse)msg).getResponse(), 
promise);
-        }
-        super.write(ctx, msg, promise);
+    public InboundStreamHttpResponse(HttpResponse response, InputStream in) {
+        this.response = response;
+        this.in = in;
     }
 
+    public InputStream getInputStream() {
+        return in;
+    }
+
+    public HttpResponse getHttpResponse() {
+        return response;
+    }
 }
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpBinding.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpBinding.java
index 4ccb67c..020f40e 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpBinding.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpBinding.java
@@ -45,7 +45,20 @@ public interface NettyHttpBinding {
     Message toCamelMessage(FullHttpRequest request, Exchange exchange, 
NettyHttpConfiguration configuration) throws Exception;
 
     /**
+     * Binds from Netty {@link InboundStreamHttpRequest} to Camel {@link 
Message}.
+     * <p/>
+     *
+     * @param request       the netty http request
+     * @param exchange      the exchange that should contain the returned 
message.
+     * @param configuration the endpoint configuration
+     * @return the message to store on the given exchange
+     * @throws Exception is thrown if error during binding
+     */
+    Message toCamelMessage(InboundStreamHttpRequest request, Exchange 
exchange, NettyHttpConfiguration configuration) throws Exception;
+
+    /**
      * Binds from Netty {@link HttpRequest} to Camel headers as a {@link Map}.
+     * Will use the <tt>populateCamelHeaders</tt> method for populating the 
headers.
      *
      * @param request       the netty http request
      * @param headers       the Camel headers that should be populated
@@ -53,7 +66,7 @@ public interface NettyHttpBinding {
      * @param configuration the endpoint configuration
      * @throws Exception is thrown if error during binding
      */
-    void populateCamelHeaders(FullHttpRequest request, Map<String, Object> 
headers, Exchange exchange, NettyHttpConfiguration configuration) throws 
Exception;
+    void populateCamelHeaders(HttpRequest request, Map<String, Object> 
headers, Exchange exchange, NettyHttpConfiguration configuration) throws 
Exception;
 
     /**
      * Binds from Netty {@link HttpResponse} to Camel {@link Message}.
@@ -69,6 +82,19 @@ public interface NettyHttpBinding {
     Message toCamelMessage(FullHttpResponse response, Exchange exchange, 
NettyHttpConfiguration configuration) throws Exception;
 
     /**
+     * Binds from Netty {@link InboundStreamHttpResponse} to Camel {@link 
Message}.
+     * <p/>
+     * Will use the <tt>populateCamelHeaders</tt> method for populating the 
headers.
+     *
+     * @param response      the netty http response
+     * @param exchange      the exchange that should contain the returned 
message.
+     * @param configuration the endpoint configuration
+     * @return the message to store on the given exchange
+     * @throws Exception is thrown if error during binding
+     */
+    Message toCamelMessage(InboundStreamHttpResponse response, Exchange 
exchange, NettyHttpConfiguration configuration) throws Exception;
+
+    /**
      * Binds from Netty {@link HttpResponse} to Camel headers as a {@link Map}.
      *
      * @param response      the netty http response
@@ -77,7 +103,7 @@ public interface NettyHttpBinding {
      * @param configuration the endpoint configuration
      * @throws Exception is thrown if error during binding
      */
-    void populateCamelHeaders(FullHttpResponse response, Map<String, Object> 
headers, Exchange exchange, NettyHttpConfiguration configuration) throws 
Exception;
+    void populateCamelHeaders(HttpResponse response, Map<String, Object> 
headers, Exchange exchange, NettyHttpConfiguration configuration) throws 
Exception;
 
     /**
      * Binds from Camel {@link Message} to Netty {@link HttpResponse}.
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
index efd208f..2da93f3 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
@@ -115,8 +115,14 @@ public class NettyHttpEndpoint extends NettyEndpoint 
implements AsyncEndpoint, H
     public Exchange createExchange(ChannelHandlerContext ctx, Object message) 
throws Exception {
         Exchange exchange = createExchange();
 
-        FullHttpRequest request = (FullHttpRequest) message;
-        Message in = getNettyHttpBinding().toCamelMessage(request, exchange, 
getConfiguration());
+        Message in;
+        if (message instanceof FullHttpRequest) {
+            FullHttpRequest request = (FullHttpRequest) message;
+            in = getNettyHttpBinding().toCamelMessage(request, exchange, 
getConfiguration());
+        } else {
+            InboundStreamHttpRequest request = (InboundStreamHttpRequest) 
message;
+            in = getNettyHttpBinding().toCamelMessage(request, exchange, 
getConfiguration());
+        }
         exchange.setIn(in);
 
         // setup the common message headers
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpRequest.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpRequest.java
similarity index 92%
rename from 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpRequest.java
rename to 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpRequest.java
index b34ca18..fc4fbcb 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpRequest.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpRequest.java
@@ -26,10 +26,10 @@ import io.netty.handler.codec.http.HttpContent;
 import io.netty.handler.stream.ChunkedInput;
 import io.netty.handler.stream.ChunkedStream;
 
-public class ChunkedHttpRequest extends DefaultHttpRequest implements 
ChunkedInput<HttpContent> {
+public class OutboundStreamHttpRequest extends DefaultHttpRequest implements 
ChunkedInput<HttpContent> {
     private HttpChunkedInput input;
 
-    public ChunkedHttpRequest(InputStream in, DefaultHttpRequest request) {
+    public OutboundStreamHttpRequest(InputStream in, DefaultHttpRequest 
request) {
         super(request.protocolVersion(), request.method(), request.uri());
         this.input = new HttpChunkedInput(new ChunkedStream(in));
     }
@@ -69,4 +69,3 @@ public class ChunkedHttpRequest extends DefaultHttpRequest 
implements ChunkedInp
         return input.progress();
     }
 }
-
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpResponse.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpResponse.java
similarity index 92%
rename from 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpResponse.java
rename to 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpResponse.java
index 29bfa1c..967e0d0 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpResponse.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpResponse.java
@@ -27,10 +27,10 @@ import io.netty.handler.codec.http.HttpContent;
 import io.netty.handler.stream.ChunkedInput;
 import io.netty.handler.stream.ChunkedStream;
 
-public class ChunkedHttpResponse extends DefaultHttpResponse implements 
ChunkedInput<HttpContent> {
+public class OutboundStreamHttpResponse extends DefaultHttpResponse implements 
ChunkedInput<HttpContent> {
     private HttpChunkedInput input;
 
-    public ChunkedHttpResponse(InputStream in, DefaultHttpResponse response) {
+    public OutboundStreamHttpResponse(InputStream in, DefaultHttpResponse 
response) {
         super(response.protocolVersion(), response.status());
         this.input = new HttpChunkedInput(new ChunkedStream(in));
     }
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/RestNettyHttpBinding.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/RestNettyHttpBinding.java
index 71f3324..172ab0d 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/RestNettyHttpBinding.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/RestNettyHttpBinding.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.netty.http;
 import java.net.URI;
 import java.util.Map;
 
-import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpRequest;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.HeaderFilterStrategy;
@@ -46,7 +46,7 @@ public class RestNettyHttpBinding extends 
DefaultNettyHttpBinding {
     }
 
     @Override
-    public void populateCamelHeaders(FullHttpRequest request, Map<String, 
Object> headers, Exchange exchange, NettyHttpConfiguration configuration) 
throws Exception {
+    public void populateCamelHeaders(HttpRequest request, Map<String, Object> 
headers, Exchange exchange, NettyHttpConfiguration configuration) throws 
Exception {
         super.populateCamelHeaders(request, headers, exchange, configuration);
 
         String path = request.uri();
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
index 132bfb2..3249f11 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java
@@ -23,11 +23,13 @@ import java.util.Map;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpUtil;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
+import org.apache.camel.component.netty.http.InboundStreamHttpResponse;
 import org.apache.camel.component.netty.http.NettyHttpProducer;
 
 /**
@@ -44,7 +46,20 @@ public class HttpClientChannelHandler extends 
ClientChannelHandler {
 
     @Override
     protected Message getResponseMessage(Exchange exchange, 
ChannelHandlerContext ctx, Object message) throws Exception {
-        FullHttpResponse response = (FullHttpResponse) message;
+        HttpResponse response;
+        Message answer;
+
+        if (message instanceof FullHttpResponse) {
+            FullHttpResponse fullHttpResponse = (FullHttpResponse) message;
+            response = fullHttpResponse;
+            // use the binding
+            answer = 
producer.getEndpoint().getNettyHttpBinding().toCamelMessage(fullHttpResponse, 
exchange, producer.getConfiguration());
+        } else {
+            InboundStreamHttpResponse streamHttpResponse = 
(InboundStreamHttpResponse) message;
+            response = streamHttpResponse.getHttpResponse();
+            answer = 
producer.getEndpoint().getNettyHttpBinding().toCamelMessage(streamHttpResponse, 
exchange, producer.getConfiguration());
+        }
+
         if (!HttpUtil.isKeepAlive(response)) {
             // just want to make sure we close the channel if the keepAlive is 
not true
             
exchange.setProperty(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
@@ -59,7 +74,7 @@ public class HttpClientChannelHandler extends 
ClientChannelHandler {
             }
             producer.getEndpoint().getCookieHandler().storeCookies(exchange, 
uri, m);
         }
-        // use the binding
-        return 
producer.getEndpoint().getNettyHttpBinding().toCamelMessage(response, exchange, 
producer.getConfiguration());
+
+        return answer;
     }
 }
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpInboundStreamHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpInboundStreamHandler.java
new file mode 100644
index 0000000..6f07488
--- /dev/null
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpInboundStreamHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.netty.http.handlers;
+
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.LastHttpContent;
+import org.apache.camel.component.netty.http.InboundStreamHttpRequest;
+import org.apache.camel.component.netty.http.InboundStreamHttpResponse;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
+import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
+
+public class HttpInboundStreamHandler extends 
MessageToMessageDecoder<HttpObject> {
+    private PipedInputStream is;
+    private PipedOutputStream os;
+    private boolean isChunked;
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        super.handlerAdded(ctx);
+        os = new PipedOutputStream();
+        is = new PipedInputStream(os);
+        isChunked = false;
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        super.handlerRemoved(ctx);
+        os.close();
+        is.close();
+    }
+
+    @Override
+    public boolean acceptInboundMessage(Object msg) throws Exception {
+        if (!super.acceptInboundMessage(msg)) {
+            return false;
+        }
+
+        if (msg instanceof HttpMessage) {
+            HttpHeaders headers = ((HttpMessage) msg).headers();
+            return isChunked = headers.contains(TRANSFER_ENCODING, CHUNKED, 
true);
+        } else {
+            return (msg instanceof HttpContent) && isChunked;
+        }
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, HttpObject msg, 
List<Object> out) throws Exception {
+        if (msg instanceof HttpRequest) {
+            InboundStreamHttpRequest request = new 
InboundStreamHttpRequest((HttpRequest)msg, is);
+            out.add(request);
+        }
+
+        if (msg instanceof HttpResponse) {
+            InboundStreamHttpResponse response = new 
InboundStreamHttpResponse((HttpResponse)msg, is);
+            out.add(response);
+        }
+
+        if (msg instanceof HttpContent) {
+            ByteBuf body = ((HttpContent) msg).content();
+            if (body.readableBytes() > 0) {
+                body.readBytes(os, body.readableBytes());
+            }
+
+            if (msg instanceof LastHttpContent) {
+                os.close();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpOutboundStreamHandler.java
similarity index 57%
rename from 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
rename to 
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpOutboundStreamHandler.java
index e07fa2a..95b4a18 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpOutboundStreamHandler.java
@@ -14,21 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.netty.http;
+package org.apache.camel.component.netty.http.handlers;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
 import io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.camel.component.netty.http.OutboundStreamHttpRequest;
+import org.apache.camel.component.netty.http.OutboundStreamHttpResponse;
 
-public class CustomChunkedWriteHandler extends ChunkedWriteHandler {
+public class HttpOutboundStreamHandler extends ChunkedWriteHandler {
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
-        if (msg instanceof ChunkedHttpRequest) {
-            super.write(ctx, ((ChunkedHttpRequest) msg).getRequest(), promise);
-        } else if (msg instanceof ChunkedHttpResponse) {
-            super.write(ctx, ((ChunkedHttpResponse)msg).getResponse(), 
promise);
+        boolean needNewPromise = false;
+
+        if (msg instanceof OutboundStreamHttpRequest) {
+            super.write(ctx, ((OutboundStreamHttpRequest) msg).getRequest(), 
promise);
+            needNewPromise = true;
+        } else if (msg instanceof OutboundStreamHttpResponse) {
+            super.write(ctx, ((OutboundStreamHttpResponse)msg).getResponse(), 
promise);
+            needNewPromise = true;
+        }
+
+        if (needNewPromise) {
+            promise = new DefaultChannelPromise(ctx.channel());
         }
+
         super.write(ctx, msg, promise);
     }
-
 }
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
index 888bede..30498c9 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
@@ -40,6 +40,7 @@ import org.apache.camel.component.netty.NettyConverter;
 import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.handlers.ServerChannelHandler;
 import org.apache.camel.component.netty.http.HttpPrincipal;
+import org.apache.camel.component.netty.http.InboundStreamHttpRequest;
 import org.apache.camel.component.netty.http.NettyHttpConfiguration;
 import org.apache.camel.component.netty.http.NettyHttpConsumer;
 import org.apache.camel.component.netty.http.NettyHttpSecurityConfiguration;
@@ -77,7 +78,12 @@ public class HttpServerChannelHandler extends 
ServerChannelHandler {
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-        HttpRequest request = (HttpRequest) msg;
+        HttpRequest request;
+        if (msg instanceof HttpRequest) {
+            request = (HttpRequest) msg;
+        } else {
+            request = ((InboundStreamHttpRequest) msg).getHttpRequest();
+        }
 
         LOG.debug("Message received: {}", request);
 
@@ -272,7 +278,13 @@ public class HttpServerChannelHandler extends 
ServerChannelHandler {
             exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
             exchange.setProperty(Exchange.SKIP_WWW_FORM_URLENCODED, 
Boolean.TRUE);
         }
-        HttpRequest request = (HttpRequest) message;
+
+        HttpRequest request;
+        if (message instanceof HttpRequest) {
+            request = (HttpRequest) message;
+        } else {
+            request = ((InboundStreamHttpRequest)message).getHttpRequest();
+        }
         // setup the connection property in case of the message header is 
removed
         boolean keepAlive = HttpUtil.isKeepAlive(request);
         if (!keepAlive) {
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java
index 1195a10..8e0ae91 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java
@@ -36,6 +36,7 @@ import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.netty.http.HttpServerConsumerChannelFactory;
+import org.apache.camel.component.netty.http.InboundStreamHttpRequest;
 import org.apache.camel.component.netty.http.NettyHttpConfiguration;
 import org.apache.camel.component.netty.http.NettyHttpConsumer;
 import org.apache.camel.http.common.CamelServlet;
@@ -108,7 +109,12 @@ public class HttpServerMultiplexChannelHandler extends 
SimpleChannelInboundHandl
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         // store request, as this channel handler is created per pipeline
-        HttpRequest request = (HttpRequest) msg;
+        HttpRequest request;
+        if (msg instanceof HttpRequest) {
+            request = (HttpRequest) msg;
+        } else {
+            request = ((InboundStreamHttpRequest) msg).getHttpRequest();
+        }
 
         LOG.debug("Message received: {}", request);
 
@@ -147,7 +153,7 @@ public class HttpServerMultiplexChannelHandler extends 
SimpleChannelInboundHandl
                     HttpContent httpContent = (HttpContent) msg;
                     httpContent.content().retain();
                 }
-                handler.channelRead(ctx, request);
+                handler.channelRead(ctx, msg);
             }
         } else {
             // okay we cannot process this requires so return either 404 or 
405.
diff --git 
a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamTest.java
 
b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamTest.java
new file mode 100644
index 0000000..50f6d0f
--- /dev/null
+++ 
b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.Test;
+
+public class NettyHttpStreamTest extends BaseNettyTest {
+    public static final long SIZE =  10 * 256;
+
+    @Test
+    public void testUploadStream() {
+        //prepare new request
+        DefaultExchange request = new DefaultExchange(context);
+        request.getIn().setBody("dummy");
+
+        //trigger request
+        Exchange response = template.send("direct:upstream-call", request);
+
+        //validate response success
+        assertFalse("ups", response.isFailed());
+
+        //validate request stream at server
+        MockEndpoint mock = context.getEndpoint("mock:stream-size", 
MockEndpoint.class);
+        Long requestSize = 
mock.getExchanges().get(0).getIn().getBody(Long.class);
+        assertEquals("request size not matching.", SIZE, 
requestSize.longValue());
+    }
+
+    @Test
+    public void testDownloadStream() {
+        //prepare new request
+        DefaultExchange request = new DefaultExchange(context);
+        request.getIn().setBody("dummy");
+
+        //trigger request
+        Exchange response = template.send("direct:download-call", request);
+
+        //validate response success
+        assertFalse("ups", response.isFailed());
+
+        //validate response stream at client
+        assertEquals("response size not matching.", SIZE, 
response.getIn().getBody(Long.class).longValue());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:upstream-call")
+                    .bean(Helper.class, "prepareStream")
+                    
.to("netty-http:http://localhost:{{port}}/upstream?disableStreamCache=true";)
+                    .log("get ${body}");
+
+                from("direct:download-call")
+                    
.to("netty-http:http://localhost:{{port}}/downstream?disableStreamCache=true";)
+                    .bean(Helper.class, "asyncProcessStream")
+                    .log("get ${body}");
+
+                
from("netty-http:http://0.0.0.0:{{port}}/upstream?disableStreamCache=true";)
+                    .bean(Helper.class, "processStream")
+                    .to("mock:stream-size");
+
+                
from("netty-http:http://0.0.0.0:{{port}}/downstream?disableStreamCache=true";)
+                    .bean(Helper.class, "prepareStream");
+            }
+        };
+    }
+}
+
+final class Helper {
+    private Helper() {
+    }
+
+    public static void processStream(Exchange exchange) throws Exception {
+        InputStream is = exchange.getIn().getBody(InputStream.class);
+
+        byte[] buffer = new byte[1024];
+        long read = 0;
+        long total = 0;
+        while ((read = is.read(buffer, 0, buffer.length)) != -1) {
+            total += read;
+        }
+
+        exchange.getIn().setBody(new Long(total));
+    }
+
+    public static CompletableFuture<Void> asyncProcessStream(Exchange 
exchange) {
+        return CompletableFuture.runAsync(() -> {
+            try {
+                processStream(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+        });
+    }
+
+    public static void prepareStream(Exchange exchange) throws Exception {
+        PipedOutputStream pos = new PipedOutputStream();
+        PipedInputStream pis = new PipedInputStream(pos);
+
+        exchange.getIn().setBody(pis);
+
+        StreamWriter sw = new StreamWriter(pos, NettyHttpStreamTest.SIZE);
+        sw.start();
+    }
+}
+
+class StreamWriter extends Thread {
+    private PipedOutputStream pos;
+    private long limit;
+    private byte[] content = "hello world stream".getBytes();
+
+    public StreamWriter(PipedOutputStream pos, long limit) {
+        this.pos = pos;
+        this.limit = limit;
+    }
+
+    @Override
+    public void run() {
+        long count = 0;
+
+        try {
+            while (count < limit) {
+                long len = content.length < (limit - count) ? content.length : 
limit - count;
+                pos.write(content, 0, (int)len);
+                pos.flush();
+                count += len;
+            }
+            pos.close();
+        } catch (Exception e) {
+        }
+    }
+}

Reply via email to