This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new fbed4bd CAMEL-14017 - Update to support the full streaming on both inbound and outbound (#3209) fbed4bd is described below commit fbed4bda80840cb910d312388d0bf855141b1e55 Author: Amos Feng <zf...@redhat.com> AuthorDate: Mon Sep 30 11:41:02 2019 +0800 CAMEL-14017 - Update to support the full streaming on both inbound and outbound (#3209) - Add the HttpInboundStreamHandler - Refactor the CustomWriterHandler to HttpOutboundStreamHandler - fix an issue in the writer handler to create a new promise after writing the request/response headers - create the NettyHttpStreamTest --- .../netty/http/DefaultNettyHttpBinding.java | 48 +++++-- .../netty/http/HttpClientInitializerFactory.java | 7 +- .../netty/http/HttpServerInitializerFactory.java | 7 +- .../http/HttpServerSharedInitializerFactory.java | 5 +- ...eHandler.java => InboundStreamHttpRequest.java} | 30 ++-- ...Handler.java => InboundStreamHttpResponse.java} | 30 ++-- .../component/netty/http/NettyHttpBinding.java | 30 +++- .../component/netty/http/NettyHttpEndpoint.java | 10 +- ...Request.java => OutboundStreamHttpRequest.java} | 5 +- ...sponse.java => OutboundStreamHttpResponse.java} | 4 +- .../component/netty/http/RestNettyHttpBinding.java | 4 +- .../http/handlers/HttpClientChannelHandler.java | 21 ++- .../http/handlers/HttpInboundStreamHandler.java | 97 +++++++++++++ .../HttpOutboundStreamHandler.java} | 25 +++- .../http/handlers/HttpServerChannelHandler.java | 16 ++- .../HttpServerMultiplexChannelHandler.java | 10 +- .../component/netty/http/NettyHttpStreamTest.java | 156 +++++++++++++++++++++ 17 files changed, 442 insertions(+), 63 deletions(-) diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index c4d7601..e5bea4a 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -113,7 +113,21 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } @Override - public void populateCamelHeaders(FullHttpRequest request, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception { + public Message toCamelMessage(InboundStreamHttpRequest request, Exchange exchange, NettyHttpConfiguration configuration) throws Exception { + LOG.trace("toCamelMessage: {}", request); + + NettyHttpMessage answer = new NettyHttpMessage(exchange.getContext(), null, null); + answer.setExchange(exchange); + if (configuration.isMapHeaders()) { + populateCamelHeaders(request.getHttpRequest(), answer.getHeaders(), exchange, configuration); + } + + answer.setBody(request.getInputStream()); + return answer; + } + + @Override + public void populateCamelHeaders(HttpRequest request, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception { LOG.trace("populateCamelHeaders: {}", request); // NOTE: these headers is applied using the same logic as camel-http/camel-jetty to be consistent @@ -227,13 +241,13 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { // if we're proxying the body is a buffer that we do not want to consume directly if (request.method().name().equals("POST") && request.headers().get(Exchange.CONTENT_TYPE) != null && request.headers().get(Exchange.CONTENT_TYPE).startsWith(NettyHttpConstants.CONTENT_TYPE_WWW_FORM_URLENCODED) - && !configuration.isBridgeEndpoint() && !configuration.isHttpProxy()) { + && !configuration.isBridgeEndpoint() && !configuration.isHttpProxy() && request instanceof FullHttpRequest) { String charset = "UTF-8"; // Push POST form params into the headers to retain compatibility with DefaultHttpBinding String body; - ByteBuf buffer = request.content(); + ByteBuf buffer = ((FullHttpRequest)request).content(); try { body = buffer.toString(Charset.forName(charset)); } finally { @@ -306,7 +320,21 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } @Override - public void populateCamelHeaders(FullHttpResponse response, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception { + public Message toCamelMessage(InboundStreamHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) throws Exception { + LOG.trace("toCamelMessage: {}", response); + + NettyHttpMessage answer = new NettyHttpMessage(exchange.getContext(), null, null); + answer.setExchange(exchange); + if (configuration.isMapHeaders()) { + populateCamelHeaders(response.getHttpResponse(), answer.getHeaders(), exchange, configuration); + } + + answer.setBody(response.getInputStream()); + return answer; + } + + @Override + public void populateCamelHeaders(HttpResponse response, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception { LOG.trace("populateCamelHeaders: {}", response); headers.put(Exchange.HTTP_RESPONSE_CODE, response.status().code()); @@ -394,7 +422,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { HttpResponse response = null; if (response == null && body instanceof InputStream && configuration.isDisableStreamCache()) { - response = new ChunkedHttpResponse((InputStream)body, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code))); + response = new OutboundStreamHttpResponse((InputStream)body, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code))); response.headers().set(TRANSFER_ENCODING, CHUNKED); } @@ -547,7 +575,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } if (request == null && body instanceof InputStream && configuration.isDisableStreamCache()) { - request = new ChunkedHttpRequest((InputStream)body, new DefaultHttpRequest(protocol, httpMethod, uriForRequest)); + request = new OutboundStreamHttpRequest((InputStream)body, new DefaultHttpRequest(protocol, httpMethod, uriForRequest)); request.headers().set(TRANSFER_ENCODING, CHUNKED); } @@ -572,7 +600,6 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } } - if (buffer != null) { if (buffer.readableBytes() > 0) { request = ((DefaultFullHttpRequest)request).replace(buffer); @@ -585,12 +612,11 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } } } - - // update HTTP method accordingly as we know if we have a body or not - HttpMethod method = NettyHttpHelper.createMethod(message, body != null); - request.setMethod(method); } + // update HTTP method accordingly as we know if we have a body or not + HttpMethod method = NettyHttpHelper.createMethod(message, body != null); + request.setMethod(method); TypeConverter tc = message.getExchange().getContext().getTypeConverter(); diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpClientInitializerFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpClientInitializerFactory.java index 94edeb5..43f6c95 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpClientInitializerFactory.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpClientInitializerFactory.java @@ -35,6 +35,8 @@ import org.apache.camel.component.netty.ClientInitializerFactory; import org.apache.camel.component.netty.NettyConfiguration; import org.apache.camel.component.netty.NettyProducer; import org.apache.camel.component.netty.http.handlers.HttpClientChannelHandler; +import org.apache.camel.component.netty.http.handlers.HttpInboundStreamHandler; +import org.apache.camel.component.netty.http.handlers.HttpOutboundStreamHandler; import org.apache.camel.component.netty.ssl.SSLEngineFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,8 +108,11 @@ public class HttpClientInitializerFactory extends ClientInitializerFactory { } pipeline.addLast("decoder-" + x, decoder); } + if (configuration.isDisableStreamCache()) { + pipeline.addLast("inbound-streamer", new HttpInboundStreamHandler()); + } pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength())); - pipeline.addLast("streamer", new CustomChunkedWriteHandler()); + pipeline.addLast("outbound-streamer", new HttpOutboundStreamHandler()); if (producer.getConfiguration().getRequestTimeout() > 0) { if (LOG.isTraceEnabled()) { diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerInitializerFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerInitializerFactory.java index 9f97fe4..25b94e7 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerInitializerFactory.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerInitializerFactory.java @@ -37,6 +37,8 @@ import org.apache.camel.component.netty.ChannelHandlerFactory; import org.apache.camel.component.netty.NettyConsumer; import org.apache.camel.component.netty.NettyServerBootstrapConfiguration; import org.apache.camel.component.netty.ServerInitializerFactory; +import org.apache.camel.component.netty.http.handlers.HttpInboundStreamHandler; +import org.apache.camel.component.netty.http.handlers.HttpOutboundStreamHandler; import org.apache.camel.component.netty.ssl.SSLEngineFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,8 +109,11 @@ public class HttpServerInitializerFactory extends ServerInitializerFactory { } pipeline.addLast("encoder-" + x, encoder); } + if (configuration.isDisableStreamCache()) { + pipeline.addLast("inbound-streamer", new HttpInboundStreamHandler()); + } pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength())); - pipeline.addLast("streamer", new CustomChunkedWriteHandler()); + pipeline.addLast("outbound-streamer", new HttpOutboundStreamHandler()); if (supportCompressed()) { pipeline.addLast("deflater", new HttpContentCompressor()); } diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedInitializerFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedInitializerFactory.java index 5e9a328..66db3ed 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedInitializerFactory.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedInitializerFactory.java @@ -30,6 +30,8 @@ import org.apache.camel.CamelContext; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.netty.NettyConsumer; import org.apache.camel.component.netty.ServerInitializerFactory; +import org.apache.camel.component.netty.http.handlers.HttpInboundStreamHandler; +import org.apache.camel.component.netty.http.handlers.HttpOutboundStreamHandler; import org.apache.camel.component.netty.ssl.SSLEngineFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,8 +86,9 @@ public class HttpServerSharedInitializerFactory extends HttpServerInitializerFac pipeline.addLast("decoder", new HttpRequestDecoder(4096, configuration.getMaxHeaderSize(), 8192)); pipeline.addLast("encoder", new HttpResponseEncoder()); if (configuration.isChunked()) { + pipeline.addLast("inbound-streamer", new HttpInboundStreamHandler()); pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength())); - pipeline.addLast("streamer", new CustomChunkedWriteHandler()); + pipeline.addLast("outbound-streamer", new HttpOutboundStreamHandler()); } if (configuration.isCompression()) { pipeline.addLast("deflater", new HttpContentCompressor()); diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpRequest.java similarity index 57% copy from components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java copy to components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpRequest.java index e07fa2a..5ef80a2 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpRequest.java @@ -14,21 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.camel.component.netty.http; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.stream.ChunkedWriteHandler; +import java.io.InputStream; + +import io.netty.handler.codec.http.HttpRequest; + +public class InboundStreamHttpRequest { + private HttpRequest request; + private InputStream in; -public class CustomChunkedWriteHandler extends ChunkedWriteHandler { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ChunkedHttpRequest) { - super.write(ctx, ((ChunkedHttpRequest) msg).getRequest(), promise); - } else if (msg instanceof ChunkedHttpResponse) { - super.write(ctx, ((ChunkedHttpResponse)msg).getResponse(), promise); - } - super.write(ctx, msg, promise); + public InboundStreamHttpRequest(HttpRequest request, InputStream in) { + this.request = request; + this.in = in; } + public InputStream getInputStream() { + return in; + } + + public HttpRequest getHttpRequest() { + return request; + } } diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpResponse.java similarity index 57% copy from components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java copy to components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpResponse.java index e07fa2a..476f59c 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/InboundStreamHttpResponse.java @@ -14,21 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.camel.component.netty.http; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.stream.ChunkedWriteHandler; +import java.io.InputStream; + +import io.netty.handler.codec.http.HttpResponse; + +public class InboundStreamHttpResponse { + private HttpResponse response; + private InputStream in; -public class CustomChunkedWriteHandler extends ChunkedWriteHandler { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ChunkedHttpRequest) { - super.write(ctx, ((ChunkedHttpRequest) msg).getRequest(), promise); - } else if (msg instanceof ChunkedHttpResponse) { - super.write(ctx, ((ChunkedHttpResponse)msg).getResponse(), promise); - } - super.write(ctx, msg, promise); + public InboundStreamHttpResponse(HttpResponse response, InputStream in) { + this.response = response; + this.in = in; } + public InputStream getInputStream() { + return in; + } + + public HttpResponse getHttpResponse() { + return response; + } } diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpBinding.java index 4ccb67c..020f40e 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpBinding.java @@ -45,7 +45,20 @@ public interface NettyHttpBinding { Message toCamelMessage(FullHttpRequest request, Exchange exchange, NettyHttpConfiguration configuration) throws Exception; /** + * Binds from Netty {@link InboundStreamHttpRequest} to Camel {@link Message}. + * <p/> + * + * @param request the netty http request + * @param exchange the exchange that should contain the returned message. + * @param configuration the endpoint configuration + * @return the message to store on the given exchange + * @throws Exception is thrown if error during binding + */ + Message toCamelMessage(InboundStreamHttpRequest request, Exchange exchange, NettyHttpConfiguration configuration) throws Exception; + + /** * Binds from Netty {@link HttpRequest} to Camel headers as a {@link Map}. + * Will use the <tt>populateCamelHeaders</tt> method for populating the headers. * * @param request the netty http request * @param headers the Camel headers that should be populated @@ -53,7 +66,7 @@ public interface NettyHttpBinding { * @param configuration the endpoint configuration * @throws Exception is thrown if error during binding */ - void populateCamelHeaders(FullHttpRequest request, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception; + void populateCamelHeaders(HttpRequest request, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception; /** * Binds from Netty {@link HttpResponse} to Camel {@link Message}. @@ -69,6 +82,19 @@ public interface NettyHttpBinding { Message toCamelMessage(FullHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) throws Exception; /** + * Binds from Netty {@link InboundStreamHttpResponse} to Camel {@link Message}. + * <p/> + * Will use the <tt>populateCamelHeaders</tt> method for populating the headers. + * + * @param response the netty http response + * @param exchange the exchange that should contain the returned message. + * @param configuration the endpoint configuration + * @return the message to store on the given exchange + * @throws Exception is thrown if error during binding + */ + Message toCamelMessage(InboundStreamHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) throws Exception; + + /** * Binds from Netty {@link HttpResponse} to Camel headers as a {@link Map}. * * @param response the netty http response @@ -77,7 +103,7 @@ public interface NettyHttpBinding { * @param configuration the endpoint configuration * @throws Exception is thrown if error during binding */ - void populateCamelHeaders(FullHttpResponse response, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception; + void populateCamelHeaders(HttpResponse response, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception; /** * Binds from Camel {@link Message} to Netty {@link HttpResponse}. diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java index efd208f..2da93f3 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java @@ -115,8 +115,14 @@ public class NettyHttpEndpoint extends NettyEndpoint implements AsyncEndpoint, H public Exchange createExchange(ChannelHandlerContext ctx, Object message) throws Exception { Exchange exchange = createExchange(); - FullHttpRequest request = (FullHttpRequest) message; - Message in = getNettyHttpBinding().toCamelMessage(request, exchange, getConfiguration()); + Message in; + if (message instanceof FullHttpRequest) { + FullHttpRequest request = (FullHttpRequest) message; + in = getNettyHttpBinding().toCamelMessage(request, exchange, getConfiguration()); + } else { + InboundStreamHttpRequest request = (InboundStreamHttpRequest) message; + in = getNettyHttpBinding().toCamelMessage(request, exchange, getConfiguration()); + } exchange.setIn(in); // setup the common message headers diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpRequest.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpRequest.java similarity index 92% rename from components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpRequest.java rename to components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpRequest.java index b34ca18..fc4fbcb 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpRequest.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpRequest.java @@ -26,10 +26,10 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.stream.ChunkedInput; import io.netty.handler.stream.ChunkedStream; -public class ChunkedHttpRequest extends DefaultHttpRequest implements ChunkedInput<HttpContent> { +public class OutboundStreamHttpRequest extends DefaultHttpRequest implements ChunkedInput<HttpContent> { private HttpChunkedInput input; - public ChunkedHttpRequest(InputStream in, DefaultHttpRequest request) { + public OutboundStreamHttpRequest(InputStream in, DefaultHttpRequest request) { super(request.protocolVersion(), request.method(), request.uri()); this.input = new HttpChunkedInput(new ChunkedStream(in)); } @@ -69,4 +69,3 @@ public class ChunkedHttpRequest extends DefaultHttpRequest implements ChunkedInp return input.progress(); } } - diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpResponse.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpResponse.java similarity index 92% rename from components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpResponse.java rename to components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpResponse.java index 29bfa1c..967e0d0 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/ChunkedHttpResponse.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/OutboundStreamHttpResponse.java @@ -27,10 +27,10 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.stream.ChunkedInput; import io.netty.handler.stream.ChunkedStream; -public class ChunkedHttpResponse extends DefaultHttpResponse implements ChunkedInput<HttpContent> { +public class OutboundStreamHttpResponse extends DefaultHttpResponse implements ChunkedInput<HttpContent> { private HttpChunkedInput input; - public ChunkedHttpResponse(InputStream in, DefaultHttpResponse response) { + public OutboundStreamHttpResponse(InputStream in, DefaultHttpResponse response) { super(response.protocolVersion(), response.status()); this.input = new HttpChunkedInput(new ChunkedStream(in)); } diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/RestNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/RestNettyHttpBinding.java index 71f3324..172ab0d 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/RestNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/RestNettyHttpBinding.java @@ -19,7 +19,7 @@ package org.apache.camel.component.netty.http; import java.net.URI; import java.util.Map; -import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpRequest; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.HeaderFilterStrategy; @@ -46,7 +46,7 @@ public class RestNettyHttpBinding extends DefaultNettyHttpBinding { } @Override - public void populateCamelHeaders(FullHttpRequest request, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception { + public void populateCamelHeaders(HttpRequest request, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) throws Exception { super.populateCamelHeaders(request, headers, exchange, configuration); String path = request.uri(); diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java index 132bfb2..3249f11 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpClientChannelHandler.java @@ -23,11 +23,13 @@ import java.util.Map; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpUtil; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.component.netty.NettyConstants; import org.apache.camel.component.netty.handlers.ClientChannelHandler; +import org.apache.camel.component.netty.http.InboundStreamHttpResponse; import org.apache.camel.component.netty.http.NettyHttpProducer; /** @@ -44,7 +46,20 @@ public class HttpClientChannelHandler extends ClientChannelHandler { @Override protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception { - FullHttpResponse response = (FullHttpResponse) message; + HttpResponse response; + Message answer; + + if (message instanceof FullHttpResponse) { + FullHttpResponse fullHttpResponse = (FullHttpResponse) message; + response = fullHttpResponse; + // use the binding + answer = producer.getEndpoint().getNettyHttpBinding().toCamelMessage(fullHttpResponse, exchange, producer.getConfiguration()); + } else { + InboundStreamHttpResponse streamHttpResponse = (InboundStreamHttpResponse) message; + response = streamHttpResponse.getHttpResponse(); + answer = producer.getEndpoint().getNettyHttpBinding().toCamelMessage(streamHttpResponse, exchange, producer.getConfiguration()); + } + if (!HttpUtil.isKeepAlive(response)) { // just want to make sure we close the channel if the keepAlive is not true exchange.setProperty(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true); @@ -59,7 +74,7 @@ public class HttpClientChannelHandler extends ClientChannelHandler { } producer.getEndpoint().getCookieHandler().storeCookies(exchange, uri, m); } - // use the binding - return producer.getEndpoint().getNettyHttpBinding().toCamelMessage(response, exchange, producer.getConfiguration()); + + return answer; } } diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpInboundStreamHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpInboundStreamHandler.java new file mode 100644 index 0000000..6f07488 --- /dev/null +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpInboundStreamHandler.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.netty.http.handlers; + +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import org.apache.camel.component.netty.http.InboundStreamHttpRequest; +import org.apache.camel.component.netty.http.InboundStreamHttpResponse; + +import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING; +import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED; + +public class HttpInboundStreamHandler extends MessageToMessageDecoder<HttpObject> { + private PipedInputStream is; + private PipedOutputStream os; + private boolean isChunked; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + super.handlerAdded(ctx); + os = new PipedOutputStream(); + is = new PipedInputStream(os); + isChunked = false; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + super.handlerRemoved(ctx); + os.close(); + is.close(); + } + + @Override + public boolean acceptInboundMessage(Object msg) throws Exception { + if (!super.acceptInboundMessage(msg)) { + return false; + } + + if (msg instanceof HttpMessage) { + HttpHeaders headers = ((HttpMessage) msg).headers(); + return isChunked = headers.contains(TRANSFER_ENCODING, CHUNKED, true); + } else { + return (msg instanceof HttpContent) && isChunked; + } + } + + @Override + protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception { + if (msg instanceof HttpRequest) { + InboundStreamHttpRequest request = new InboundStreamHttpRequest((HttpRequest)msg, is); + out.add(request); + } + + if (msg instanceof HttpResponse) { + InboundStreamHttpResponse response = new InboundStreamHttpResponse((HttpResponse)msg, is); + out.add(response); + } + + if (msg instanceof HttpContent) { + ByteBuf body = ((HttpContent) msg).content(); + if (body.readableBytes() > 0) { + body.readBytes(os, body.readableBytes()); + } + + if (msg instanceof LastHttpContent) { + os.close(); + } + } + } +} \ No newline at end of file diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpOutboundStreamHandler.java similarity index 57% rename from components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java rename to components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpOutboundStreamHandler.java index e07fa2a..95b4a18 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/CustomChunkedWriteHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpOutboundStreamHandler.java @@ -14,21 +14,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.netty.http; +package org.apache.camel.component.netty.http.handlers; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.camel.component.netty.http.OutboundStreamHttpRequest; +import org.apache.camel.component.netty.http.OutboundStreamHttpResponse; -public class CustomChunkedWriteHandler extends ChunkedWriteHandler { +public class HttpOutboundStreamHandler extends ChunkedWriteHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ChunkedHttpRequest) { - super.write(ctx, ((ChunkedHttpRequest) msg).getRequest(), promise); - } else if (msg instanceof ChunkedHttpResponse) { - super.write(ctx, ((ChunkedHttpResponse)msg).getResponse(), promise); + boolean needNewPromise = false; + + if (msg instanceof OutboundStreamHttpRequest) { + super.write(ctx, ((OutboundStreamHttpRequest) msg).getRequest(), promise); + needNewPromise = true; + } else if (msg instanceof OutboundStreamHttpResponse) { + super.write(ctx, ((OutboundStreamHttpResponse)msg).getResponse(), promise); + needNewPromise = true; + } + + if (needNewPromise) { + promise = new DefaultChannelPromise(ctx.channel()); } + super.write(ctx, msg, promise); } - } diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java index 888bede..30498c9 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java @@ -40,6 +40,7 @@ import org.apache.camel.component.netty.NettyConverter; import org.apache.camel.component.netty.NettyHelper; import org.apache.camel.component.netty.handlers.ServerChannelHandler; import org.apache.camel.component.netty.http.HttpPrincipal; +import org.apache.camel.component.netty.http.InboundStreamHttpRequest; import org.apache.camel.component.netty.http.NettyHttpConfiguration; import org.apache.camel.component.netty.http.NettyHttpConsumer; import org.apache.camel.component.netty.http.NettyHttpSecurityConfiguration; @@ -77,7 +78,12 @@ public class HttpServerChannelHandler extends ServerChannelHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - HttpRequest request = (HttpRequest) msg; + HttpRequest request; + if (msg instanceof HttpRequest) { + request = (HttpRequest) msg; + } else { + request = ((InboundStreamHttpRequest) msg).getHttpRequest(); + } LOG.debug("Message received: {}", request); @@ -272,7 +278,13 @@ public class HttpServerChannelHandler extends ServerChannelHandler { exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE); exchange.setProperty(Exchange.SKIP_WWW_FORM_URLENCODED, Boolean.TRUE); } - HttpRequest request = (HttpRequest) message; + + HttpRequest request; + if (message instanceof HttpRequest) { + request = (HttpRequest) message; + } else { + request = ((InboundStreamHttpRequest)message).getHttpRequest(); + } // setup the connection property in case of the message header is removed boolean keepAlive = HttpUtil.isKeepAlive(request); if (!keepAlive) { diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java index 1195a10..8e0ae91 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java @@ -36,6 +36,7 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; import org.apache.camel.Exchange; import org.apache.camel.component.netty.http.HttpServerConsumerChannelFactory; +import org.apache.camel.component.netty.http.InboundStreamHttpRequest; import org.apache.camel.component.netty.http.NettyHttpConfiguration; import org.apache.camel.component.netty.http.NettyHttpConsumer; import org.apache.camel.http.common.CamelServlet; @@ -108,7 +109,12 @@ public class HttpServerMultiplexChannelHandler extends SimpleChannelInboundHandl @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // store request, as this channel handler is created per pipeline - HttpRequest request = (HttpRequest) msg; + HttpRequest request; + if (msg instanceof HttpRequest) { + request = (HttpRequest) msg; + } else { + request = ((InboundStreamHttpRequest) msg).getHttpRequest(); + } LOG.debug("Message received: {}", request); @@ -147,7 +153,7 @@ public class HttpServerMultiplexChannelHandler extends SimpleChannelInboundHandl HttpContent httpContent = (HttpContent) msg; httpContent.content().retain(); } - handler.channelRead(ctx, request); + handler.channelRead(ctx, msg); } } else { // okay we cannot process this requires so return either 404 or 405. diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamTest.java new file mode 100644 index 0000000..50f6d0f --- /dev/null +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty.http; + +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.concurrent.CompletableFuture; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.DefaultExchange; +import org.junit.Test; + +public class NettyHttpStreamTest extends BaseNettyTest { + public static final long SIZE = 10 * 256; + + @Test + public void testUploadStream() { + //prepare new request + DefaultExchange request = new DefaultExchange(context); + request.getIn().setBody("dummy"); + + //trigger request + Exchange response = template.send("direct:upstream-call", request); + + //validate response success + assertFalse("ups", response.isFailed()); + + //validate request stream at server + MockEndpoint mock = context.getEndpoint("mock:stream-size", MockEndpoint.class); + Long requestSize = mock.getExchanges().get(0).getIn().getBody(Long.class); + assertEquals("request size not matching.", SIZE, requestSize.longValue()); + } + + @Test + public void testDownloadStream() { + //prepare new request + DefaultExchange request = new DefaultExchange(context); + request.getIn().setBody("dummy"); + + //trigger request + Exchange response = template.send("direct:download-call", request); + + //validate response success + assertFalse("ups", response.isFailed()); + + //validate response stream at client + assertEquals("response size not matching.", SIZE, response.getIn().getBody(Long.class).longValue()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:upstream-call") + .bean(Helper.class, "prepareStream") + .to("netty-http:http://localhost:{{port}}/upstream?disableStreamCache=true") + .log("get ${body}"); + + from("direct:download-call") + .to("netty-http:http://localhost:{{port}}/downstream?disableStreamCache=true") + .bean(Helper.class, "asyncProcessStream") + .log("get ${body}"); + + from("netty-http:http://0.0.0.0:{{port}}/upstream?disableStreamCache=true") + .bean(Helper.class, "processStream") + .to("mock:stream-size"); + + from("netty-http:http://0.0.0.0:{{port}}/downstream?disableStreamCache=true") + .bean(Helper.class, "prepareStream"); + } + }; + } +} + +final class Helper { + private Helper() { + } + + public static void processStream(Exchange exchange) throws Exception { + InputStream is = exchange.getIn().getBody(InputStream.class); + + byte[] buffer = new byte[1024]; + long read = 0; + long total = 0; + while ((read = is.read(buffer, 0, buffer.length)) != -1) { + total += read; + } + + exchange.getIn().setBody(new Long(total)); + } + + public static CompletableFuture<Void> asyncProcessStream(Exchange exchange) { + return CompletableFuture.runAsync(() -> { + try { + processStream(exchange); + } catch (Exception e) { + exchange.setException(e); + } + }); + } + + public static void prepareStream(Exchange exchange) throws Exception { + PipedOutputStream pos = new PipedOutputStream(); + PipedInputStream pis = new PipedInputStream(pos); + + exchange.getIn().setBody(pis); + + StreamWriter sw = new StreamWriter(pos, NettyHttpStreamTest.SIZE); + sw.start(); + } +} + +class StreamWriter extends Thread { + private PipedOutputStream pos; + private long limit; + private byte[] content = "hello world stream".getBytes(); + + public StreamWriter(PipedOutputStream pos, long limit) { + this.pos = pos; + this.limit = limit; + } + + @Override + public void run() { + long count = 0; + + try { + while (count < limit) { + long len = content.length < (limit - count) ? content.length : limit - count; + pos.write(content, 0, (int)len); + pos.flush(); + count += len; + } + pos.close(); + } catch (Exception e) { + } + } +}