CAMEL-7884: Found a better way to handle the lifecycle of the netty buffer with 
a retain/release so we do not need the defensive copy.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3ac1fba
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3ac1fba
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3ac1fba

Branch: refs/heads/camel-2.17.x
Commit: a3ac1fba024db0f643841ec708095786fd503c94
Parents: a7ecbaf
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue May 3 15:45:06 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue May 3 15:46:53 2016 +0200

----------------------------------------------------------------------
 .../http/NettyChannelBufferStreamCache.java     | 16 ++++++++++------
 ...ttyChannelBufferStreamCacheOnCompletion.java | 20 ++++----------------
 .../netty4/http/NettyHttpConfiguration.java     |  5 ++++-
 3 files changed, 18 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
index f92fc60..2d6c7be 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
@@ -32,11 +32,12 @@ import org.apache.camel.util.IOHelper;
  */
 public final class NettyChannelBufferStreamCache extends InputStream 
implements StreamCache {
 
-    private ByteBuf buffer;
+    private final ByteBuf buffer;
 
     public NettyChannelBufferStreamCache(ByteBuf buffer) {
-        this.buffer = buffer;
-        buffer.markReaderIndex();
+        // retain the buffer so we keep it in use until we release it when we 
are done
+        this.buffer = buffer.retain();
+        this.buffer.markReaderIndex();
     }
 
     @Override
@@ -102,8 +103,11 @@ public final class NettyChannelBufferStreamCache extends 
InputStream implements
         return buffer.readableBytes();
     }
 
-    void defensiveCopyBuffer() {
-        // make a defensive copy of the buffer
-        this.buffer = buffer.copy();
+    /**
+     * Release the buffer when we are done using it.
+     */
+    public void release() {
+        buffer.release();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
index 0cc51a4..343fd13 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
@@ -17,18 +17,11 @@
 package org.apache.camel.component.netty4.http;
 
 import org.apache.camel.Exchange;
-import 
org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
 import org.apache.camel.support.SynchronizationAdapter;
 
 /**
- * A {@link org.apache.camel.spi.Synchronization} to keep track of the unit of 
work on the current {@link Exchange}
- * that has the {@link NettyChannelBufferStreamCache} as message body. This 
cache is wrapping the raw original
- * Netty {@link io.netty.buffer.ByteBuf}. Because the Netty HTTP server 
({@link HttpServerChannelHandler}) will
- * close the {@link io.netty.buffer.ByteBuf} when Netty is complete processing 
the HttpMessage, then any further
- * access to the cache will cause in a buffer unreadable. In the case of Camel 
async routing engine will
- * handover the processing of the {@link Exchange} to another thread, then we 
need to keep track of this event
- * so we can do a defensive copy of the netty {@link io.netty.buffer.ByteBuf} 
so Camel is able to read
- * the content from other threads, while Netty has closed the original {@link 
io.netty.buffer.ByteBuf}.
+ * A {@link org.apache.camel.spi.Synchronization} to handle the lifecycle of 
the {@link NettyChannelBufferStreamCache}
+ * so the cache is released when the unit of work of the Exchange is done.
  */
 public class NettyChannelBufferStreamCacheOnCompletion extends 
SynchronizationAdapter {
 
@@ -40,14 +33,9 @@ public class NettyChannelBufferStreamCacheOnCompletion 
extends SynchronizationAd
 
     @Override
     public void onDone(Exchange exchange) {
-        // okay netty is no longer being active, so we need to signal to the 
cache that its to preserve the buffer if still in need.
-        cache.defensiveCopyBuffer();
+        // release the cache when we are done routing the Exchange
+        cache.release();
     }
 
-    @Override
-    public boolean allowHandover() {
-        // do not allow handover, so we can do the defensive copy in the 
onDone method
-        return false;
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
index 97c3a7f..784d007 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
@@ -249,7 +249,10 @@ public class NettyHttpConfiguration extends 
NettyConfiguration {
      * can retrieve all data from the stream. However you can set this option 
to true when you for example need to
      * access the raw stream, such as streaming it directly to a file or other 
persistent store. Mind that
      * if you enable this option, then you cannot read the Netty stream 
multiple times out of the box, and you would
-     * need manually to reset the reader index on the Netty raw stream.
+     * need manually to reset the reader index on the Netty raw stream. Also 
Netty will auto-close the Netty stream
+     * when the Netty HTTP server is done processing, which means that if the 
asynchronous routing engine is in
+     * use then any asynchronous thread that may continue routing the {@link 
org.apache.camel.Exchange} may not
+     * be able to read the Netty stream, because Netty has closed it.
      */
     public void setDisableStreamCache(boolean disableStreamCache) {
         this.disableStreamCache = disableStreamCache;

Reply via email to