CAMEL-7884: Found a better way to handle the lifecycle of the netty buffer with a retain/release so we do not need the defensive copy.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3ac1fba Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3ac1fba Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3ac1fba Branch: refs/heads/camel-2.17.x Commit: a3ac1fba024db0f643841ec708095786fd503c94 Parents: a7ecbaf Author: Claus Ibsen <davscl...@apache.org> Authored: Tue May 3 15:45:06 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue May 3 15:46:53 2016 +0200 ---------------------------------------------------------------------- .../http/NettyChannelBufferStreamCache.java | 16 ++++++++++------ ...ttyChannelBufferStreamCacheOnCompletion.java | 20 ++++---------------- .../netty4/http/NettyHttpConfiguration.java | 5 ++++- 3 files changed, 18 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java index f92fc60..2d6c7be 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java @@ -32,11 +32,12 @@ import org.apache.camel.util.IOHelper; */ public final class NettyChannelBufferStreamCache extends InputStream implements StreamCache { - private ByteBuf buffer; + private final ByteBuf buffer; public NettyChannelBufferStreamCache(ByteBuf buffer) { - this.buffer = buffer; - buffer.markReaderIndex(); + // retain the buffer so we keep it in use until we release it when we are done + this.buffer = buffer.retain(); + this.buffer.markReaderIndex(); } @Override @@ -102,8 +103,11 @@ public final class NettyChannelBufferStreamCache extends InputStream implements return buffer.readableBytes(); } - void defensiveCopyBuffer() { - // make a defensive copy of the buffer - this.buffer = buffer.copy(); + /** + * Release the buffer when we are done using it. + */ + public void release() { + buffer.release(); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java index 0cc51a4..343fd13 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java @@ -17,18 +17,11 @@ package org.apache.camel.component.netty4.http; import org.apache.camel.Exchange; -import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler; import org.apache.camel.support.SynchronizationAdapter; /** - * A {@link org.apache.camel.spi.Synchronization} to keep track of the unit of work on the current {@link Exchange} - * that has the {@link NettyChannelBufferStreamCache} as message body. This cache is wrapping the raw original - * Netty {@link io.netty.buffer.ByteBuf}. Because the Netty HTTP server ({@link HttpServerChannelHandler}) will - * close the {@link io.netty.buffer.ByteBuf} when Netty is complete processing the HttpMessage, then any further - * access to the cache will cause in a buffer unreadable. In the case of Camel async routing engine will - * handover the processing of the {@link Exchange} to another thread, then we need to keep track of this event - * so we can do a defensive copy of the netty {@link io.netty.buffer.ByteBuf} so Camel is able to read - * the content from other threads, while Netty has closed the original {@link io.netty.buffer.ByteBuf}. + * A {@link org.apache.camel.spi.Synchronization} to handle the lifecycle of the {@link NettyChannelBufferStreamCache} + * so the cache is released when the unit of work of the Exchange is done. */ public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAdapter { @@ -40,14 +33,9 @@ public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAd @Override public void onDone(Exchange exchange) { - // okay netty is no longer being active, so we need to signal to the cache that its to preserve the buffer if still in need. - cache.defensiveCopyBuffer(); + // release the cache when we are done routing the Exchange + cache.release(); } - @Override - public boolean allowHandover() { - // do not allow handover, so we can do the defensive copy in the onDone method - return false; - } } http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java index 97c3a7f..784d007 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java @@ -249,7 +249,10 @@ public class NettyHttpConfiguration extends NettyConfiguration { * can retrieve all data from the stream. However you can set this option to true when you for example need to * access the raw stream, such as streaming it directly to a file or other persistent store. Mind that * if you enable this option, then you cannot read the Netty stream multiple times out of the box, and you would - * need manually to reset the reader index on the Netty raw stream. + * need manually to reset the reader index on the Netty raw stream. Also Netty will auto-close the Netty stream + * when the Netty HTTP server is done processing, which means that if the asynchronous routing engine is in + * use then any asynchronous thread that may continue routing the {@link org.apache.camel.Exchange} may not + * be able to read the Netty stream, because Netty has closed it. */ public void setDisableStreamCache(boolean disableStreamCache) { this.disableStreamCache = disableStreamCache;