Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 6da2fd46b -> a7ecbafc0
  refs/heads/master 3fe56aefa -> 2d80be3bf


CAMEL-7884: camel-netty4-http should do a defensive copy of the netty bytebuf 
if the async routing engine kicks in, so any further processing can still read 
the stream of data. Netty http server will otherwise have closed the original 
bytebuf.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d80be3b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d80be3b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d80be3b

Branch: refs/heads/master
Commit: 2d80be3bf010b4a740e4e9ec427f9d3fd9b45ba3
Parents: 3fe56ae
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue May 3 14:28:14 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue May 3 14:28:14 2016 +0200

----------------------------------------------------------------------
 .../netty4/http/DefaultNettyHttpBinding.java    |  2 +
 .../http/NettyChannelBufferStreamCache.java     |  7 ++-
 ...ttyChannelBufferStreamCacheOnCompletion.java | 53 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2d80be3b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index 552f5de..fe87f53 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -95,6 +95,8 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
         } else {
             // turn the body into stream cached
             NettyChannelBufferStreamCache cache = new 
NettyChannelBufferStreamCache(request.content());
+            // add on completion to the cache which is needed for Camel to 
keep track of the lifecycle of the cache
+            exchange.addOnCompletion(new 
NettyChannelBufferStreamCacheOnCompletion(cache));
             answer.setBody(cache);
         }
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/2d80be3b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
index 70635f0..f92fc60 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
@@ -32,7 +32,7 @@ import org.apache.camel.util.IOHelper;
  */
 public final class NettyChannelBufferStreamCache extends InputStream 
implements StreamCache {
 
-    private final ByteBuf buffer;
+    private ByteBuf buffer;
 
     public NettyChannelBufferStreamCache(ByteBuf buffer) {
         this.buffer = buffer;
@@ -101,4 +101,9 @@ public final class NettyChannelBufferStreamCache extends 
InputStream implements
     public long length() {
         return buffer.readableBytes();
     }
+
+    void defensiveCopyBuffer() {
+        // make a defensive copy of the buffer
+        this.buffer = buffer.copy();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2d80be3b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
new file mode 100644
index 0000000..0cc51a4
--- /dev/null
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.http;
+
+import org.apache.camel.Exchange;
+import 
org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
+import org.apache.camel.support.SynchronizationAdapter;
+
+/**
+ * A {@link org.apache.camel.spi.Synchronization} to keep track of the unit of 
work on the current {@link Exchange}
+ * that has the {@link NettyChannelBufferStreamCache} as message body. This 
cache is wrapping the raw original
+ * Netty {@link io.netty.buffer.ByteBuf}. Because the Netty HTTP server 
({@link HttpServerChannelHandler}) will
+ * close the {@link io.netty.buffer.ByteBuf} when Netty is complete processing 
the HttpMessage, then any further
+ * access to the cache will cause in a buffer unreadable. In the case of Camel 
async routing engine will
+ * handover the processing of the {@link Exchange} to another thread, then we 
need to keep track of this event
+ * so we can do a defensive copy of the netty {@link io.netty.buffer.ByteBuf} 
so Camel is able to read
+ * the content from other threads, while Netty has closed the original {@link 
io.netty.buffer.ByteBuf}.
+ */
+public class NettyChannelBufferStreamCacheOnCompletion extends 
SynchronizationAdapter {
+
+    private final NettyChannelBufferStreamCache cache;
+
+    public 
NettyChannelBufferStreamCacheOnCompletion(NettyChannelBufferStreamCache cache) {
+        this.cache = cache;
+    }
+
+    @Override
+    public void onDone(Exchange exchange) {
+        // okay netty is no longer being active, so we need to signal to the 
cache that its to preserve the buffer if still in need.
+        cache.defensiveCopyBuffer();
+    }
+
+    @Override
+    public boolean allowHandover() {
+        // do not allow handover, so we can do the defensive copy in the 
onDone method
+        return false;
+    }
+
+}

Reply via email to