This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 11cc3b7c7f8643a5e60abee2b72364713633e986 Author: Amos Feng <zf...@redhat.com> AuthorDate: Fri Jul 12 11:27:16 2019 +0800 CAMEL-12983 - Camel-netty4-http: update to add the cheunked http request/response and support stream writing --- .../component/netty4/http/ChunkedHttpRequest.java | 75 +++++++++++++++++++ .../component/netty4/http/ChunkedHttpResponse.java | 75 +++++++++++++++++++ .../netty4/http/CustomChunkedWriteHandler.java | 37 +++++++++ .../netty4/http/DefaultNettyHttpBinding.java | 87 +++++++++++++--------- .../netty4/http/HttpClientInitializerFactory.java | 1 + .../netty4/http/HttpServerInitializerFactory.java | 1 + .../http/HttpServerSharedInitializerFactory.java | 1 + 7 files changed, 243 insertions(+), 34 deletions(-) diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpRequest.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpRequest.java new file mode 100644 index 0000000..58f5c30 --- /dev/null +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpRequest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.http; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.stream.ChunkedInput; +import io.netty.handler.stream.ChunkedStream; + +import java.io.InputStream; + +/** + * @author <a href="mailto:zf...@redhat.com">Zheng Feng</a> + */ +public class ChunkedHttpRequest extends DefaultHttpRequest implements ChunkedInput<HttpContent> { + private HttpChunkedInput input; + + public ChunkedHttpRequest(InputStream in, DefaultHttpRequest request) { + super(request.protocolVersion(), request.method(), request.uri()); + this.input = new HttpChunkedInput(new ChunkedStream(in)); + } + + public DefaultHttpRequest getRequest() { + return new DefaultHttpRequest(this.protocolVersion(), this.method(), this.uri(), this.headers()); + } + + @Override + public boolean isEndOfInput() throws Exception { + return input.isEndOfInput(); + } + + @Override + public void close() throws Exception { + input.close(); + } + + @Override + @Deprecated + public HttpContent readChunk(ChannelHandlerContext channelHandlerContext) throws Exception { + return input.readChunk(channelHandlerContext); + } + + @Override + public HttpContent readChunk(ByteBufAllocator byteBufAllocator) throws Exception { + return input.readChunk(byteBufAllocator); + } + + @Override + public long length() { + return input.length(); + } + + @Override + public long progress() { + return input.progress(); + } +} + diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpResponse.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpResponse.java new file mode 100644 index 0000000..779d7d7 --- /dev/null +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/ChunkedHttpResponse.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.netty4.http; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.stream.ChunkedInput; +import io.netty.handler.stream.ChunkedStream; + +import java.io.InputStream; + +/** + * @author <a href="mailto:zf...@redhat.com">Zheng Feng</a> + */ +public class ChunkedHttpResponse extends DefaultHttpResponse implements ChunkedInput<HttpContent> { + private HttpChunkedInput input; + + public ChunkedHttpResponse(InputStream in, DefaultHttpResponse response) { + super(response.protocolVersion(), response.status()); + this.input = new HttpChunkedInput(new ChunkedStream(in)); + } + + public DefaultHttpResponse getResponse() { + return new DefaultHttpResponse(this.protocolVersion(), this.status(), this.headers()); + } + + @Override + public boolean isEndOfInput() throws Exception { + return input.isEndOfInput(); + } + + @Override + public void close() throws Exception { + input.close(); + } + + @Override + @Deprecated + public HttpContent readChunk(ChannelHandlerContext channelHandlerContext) throws Exception { + return input.readChunk(channelHandlerContext); + } + + @Override + public HttpContent readChunk(ByteBufAllocator byteBufAllocator) throws Exception { + return input.readChunk(byteBufAllocator); + } + + @Override + public long length() { + return input.length(); + } + + @Override + public long progress() { + return input.progress(); + } +} diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/CustomChunkedWriteHandler.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/CustomChunkedWriteHandler.java new file mode 100644 index 0000000..c009ba7 --- /dev/null +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/CustomChunkedWriteHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.http; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.stream.ChunkedWriteHandler; + +/** + * @author <a href="mailto:zf...@redhat.com">Zheng Feng</a> + */ +public class CustomChunkedWriteHandler extends ChunkedWriteHandler { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof ChunkedHttpRequest) { + super.write(ctx, ((ChunkedHttpRequest) msg).getRequest(), promise); + } else if (msg instanceof ChunkedHttpResponse) { + super.write(ctx, ((ChunkedHttpResponse)msg).getResponse(), promise); + } + super.write(ctx, msg, promise); + } + +} diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java index ee1e718..e29d8eb 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java @@ -17,6 +17,7 @@ package org.apache.camel.component.netty4.http; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.io.ObjectOutputStream; import java.io.PrintWriter; import java.io.StringWriter; @@ -32,6 +33,8 @@ import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; @@ -58,6 +61,9 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING; +import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED; + /** * Default {@link NettyHttpBinding}. */ @@ -377,44 +383,51 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { ExchangeHelper.setFailureHandled(message.getExchange()); } - if (body instanceof ByteBuf) { - buffer = (ByteBuf) body; - } else { - // try to convert to buffer first - buffer = message.getBody(ByteBuf.class); - if (buffer == null) { - // fallback to byte array as last resort - byte[] data = message.getBody(byte[].class); - if (data != null) { - buffer = NettyConverter.toByteBuffer(data); - } else { - // and if byte array fails then try String - String str; - if (body != null) { - str = message.getMandatoryBody(String.class); + HttpResponse response = null; + + if (response == null && body instanceof InputStream && configuration.isDisableStreamCache()) { + response = new ChunkedHttpResponse((InputStream)body, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code))); + response.headers().set(TRANSFER_ENCODING, CHUNKED); + } + + if (response == null) { + if (body instanceof ByteBuf) { + buffer = (ByteBuf) body; + } else { + // try to convert to buffer first + buffer = message.getBody(ByteBuf.class); + if (buffer == null) { + // fallback to byte array as last resort + byte[] data = message.getBody(byte[].class); + if (data != null) { + buffer = NettyConverter.toByteBuffer(data); } else { - str = ""; + // and if byte array fails then try String + String str; + if (body != null) { + str = message.getMandatoryBody(String.class); + } else { + str = ""; + } + buffer = NettyConverter.toByteBuffer(str.getBytes()); } - buffer = NettyConverter.toByteBuffer(str.getBytes()); } } - } - - HttpResponse response; - if (buffer != null) { - response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code), buffer); - // We just need to reset the readerIndex this time - if (buffer.readerIndex() == buffer.writerIndex()) { - buffer.setIndex(0, buffer.writerIndex()); + if (buffer != null) { + response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code), buffer); + // We just need to reset the readerIndex this time + if (buffer.readerIndex() == buffer.writerIndex()) { + buffer.setIndex(0, buffer.writerIndex()); + } + // TODO How to enable the chunk transport + int len = buffer.readableBytes(); + // set content-length + response.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len); + LOG.trace("Content-Length: {}", len); + } else { + response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code)); } - // TODO How to enable the chunk transport - int len = buffer.readableBytes(); - // set content-length - response.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len); - LOG.trace("Content-Length: {}", len); - } else { - response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code)); } TypeConverter tc = message.getExchange().getContext().getTypeConverter(); @@ -517,7 +530,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { proxyRequest = null; } - FullHttpRequest request = null; + HttpRequest request = null; if (message instanceof NettyHttpMessage) { // if the request is already given we should set the values // from message headers and pass on the same request @@ -532,6 +545,11 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } } + if (request == null && body instanceof InputStream && configuration.isDisableStreamCache()) { + request = new ChunkedHttpRequest((InputStream)body, new DefaultHttpRequest(protocol, httpMethod, uriForRequest)); + request.headers().set(TRANSFER_ENCODING, CHUNKED); + } + if (request == null) { request = new DefaultFullHttpRequest(protocol, httpMethod, uriForRequest); @@ -552,10 +570,11 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } } } + if (buffer != null) { if (buffer.readableBytes() > 0) { - request = request.replace(buffer); + request = ((DefaultFullHttpRequest)request).replace(buffer); int len = buffer.readableBytes(); // set content-length request.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len); diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java index b8a0cbf..81c8b73 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java @@ -107,6 +107,7 @@ public class HttpClientInitializerFactory extends ClientInitializerFactory { pipeline.addLast("decoder-" + x, decoder); } pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength())); + pipeline.addLast("streamer", new CustomChunkedWriteHandler()); if (producer.getConfiguration().getRequestTimeout() > 0) { if (LOG.isTraceEnabled()) { diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java index 2549cb0..2659c84 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java @@ -108,6 +108,7 @@ public class HttpServerInitializerFactory extends ServerInitializerFactory { pipeline.addLast("encoder-" + x, encoder); } pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength())); + pipeline.addLast("streamer", new CustomChunkedWriteHandler()); if (supportCompressed()) { pipeline.addLast("deflater", new HttpContentCompressor()); } diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java index ac62a6d..e5c50b5 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java @@ -85,6 +85,7 @@ public class HttpServerSharedInitializerFactory extends HttpServerInitializerFac pipeline.addLast("encoder", new HttpResponseEncoder()); if (configuration.isChunked()) { pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength())); + pipeline.addLast("streamer", new CustomChunkedWriteHandler()); } if (configuration.isCompression()) { pipeline.addLast("deflater", new HttpContentCompressor());