CAMEL-9040: Fixed netty leak in http4 producer

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3563f6e6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3563f6e6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3563f6e6

Branch: refs/heads/master
Commit: 3563f6e6a4cbea2841cdd6e780156683aa575b14
Parents: 74a7020
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed May 4 13:40:38 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../netty4/http/DefaultNettyHttpBinding.java    | 20 +++++-
 .../http/NettyHttpOperationFailedException.java | 28 ++++++--
 .../netty4/http/NettyHttpProducer.java          | 67 ++++++++++++++------
 .../netty4/http/NettyHttp500ErrorTest.java      |  2 +-
 ...yHttp500ErrorThrowExceptionOnServerTest.java |  3 +-
 .../netty4/http/NettyHttpHandle404Test.java     |  4 +-
 .../netty4/http/NettyHttpOkStatusCodeTest.java  |  3 +-
 .../netty4/http/NettyHttpReturnFaultTest.java   |  3 +-
 .../netty4/handlers/ClientChannelHandler.java   |  2 +
 9 files changed, 97 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index e3a28f7..f8cf4a3 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty4.http;
 
 import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -44,6 +45,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StreamCache;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyConverter;
@@ -268,8 +270,21 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             populateCamelHeaders(response, answer.getHeaders(), exchange, 
configuration);
         }
 
-        // keep the body as is, and use type converters
-        answer.setBody(response.content());
+        if (configuration.isDisableStreamCache()) {
+            // keep the body as is, and use type converters
+            answer.setBody(response.content());
+        } else {
+            // stores as byte array as the netty ByteBuf will be freedy when 
the producer is done, and then we
+            // can no longer access the message body
+            response.retain();
+            try {
+                byte[] bytes = 
exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, 
response.content());
+                answer.setBody(bytes);
+                // TODO: use stream caching
+            } finally {
+                response.release();
+            }
+        }
         return answer;
     }
 
@@ -320,7 +335,6 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
         
         LOG.trace("HTTP Status Code: {}", code);
 
-
         // if there was an exception then use that as body
         if (cause != null) {
             if (configuration.isTransferException()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
index abea14d..d75ee31 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.component.netty4.http;
 
+import java.io.UnsupportedEncodingException;
+
 import io.netty.handler.codec.http.HttpContent;
 import org.apache.camel.CamelException;
+import org.apache.camel.component.netty4.NettyConverter;
 import org.apache.camel.util.ObjectHelper;
 
-
 /**
  * Exception when a Netty HTTP operation failed.
  */
@@ -31,6 +33,7 @@ public class NettyHttpOperationFailedException extends 
CamelException {
     private final int statusCode;
     private final String statusText;
     private final transient HttpContent content;
+    private String contentAsString;
 
     public NettyHttpOperationFailedException(String uri, int statusCode, 
String statusText, String location, HttpContent content) {
         super("Netty HTTP operation failed invoking " + uri + " with 
statusCode: " + statusCode + (location != null ? ", redirectLocation: " + 
location : ""));
@@ -39,6 +42,11 @@ public class NettyHttpOperationFailedException extends 
CamelException {
         this.statusText = statusText;
         this.redirectLocation = location;
         this.content = content;
+        try {
+            this.contentAsString = NettyConverter.toString(content.content(), 
null);
+        } catch (UnsupportedEncodingException e) {
+            // ignore
+        }
     }
 
     public String getUri() {
@@ -70,8 +78,20 @@ public class NettyHttpOperationFailedException extends 
CamelException {
      * <p/>
      * Notice this may be <tt>null</tt> if this exception has been serialized,
      * as the {@link HttpContent} instance is marked as transient in this 
class.
+     *
+     * @deprecated use getContentAsString();
      */
+    @Deprecated
     public HttpContent getHttpContent() {
         return content;
     }
+
+    /**
+     * Gets the HTTP content as a String
+     * <p/>
+     * Notice this may be <tt>null</tt> if it was not possible to read the 
content
+     */
+    public String getContentAsString() {
+        return contentAsString;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
index ced0bdc..37e7ad8 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
@@ -21,11 +21,14 @@ import java.net.URI;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.netty4.NettyConfiguration;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyProducer;
+import org.apache.camel.support.SynchronizationAdapter;
 
 
 /**
@@ -58,7 +61,7 @@ public class NettyHttpProducer extends NettyProducer {
         String uri = NettyHttpHelper.createURL(exchange, getEndpoint());
         URI u = NettyHttpHelper.createURI(exchange, uri, getEndpoint());
 
-        HttpRequest request = 
getEndpoint().getNettyHttpBinding().toNettyRequest(exchange.getIn(), 
u.toString(), getConfiguration());
+        final HttpRequest request = 
getEndpoint().getNettyHttpBinding().toNettyRequest(exchange.getIn(), 
u.toString(), getConfiguration());
         String actualUri = request.getUri();
         exchange.getIn().setHeader(Exchange.HTTP_URL, actualUri);
         // Need to check if we need to close the connection or not
@@ -71,6 +74,19 @@ public class NettyHttpProducer extends NettyProducer {
             exchange.getIn().removeHeader("host");
         }
 
+        // need to release the request when we are done
+        exchange.addOnCompletion(new SynchronizationAdapter(){
+            @Override
+            public void onDone(Exchange exchange) {
+                if (request instanceof ReferenceCounted) {
+                    if (((ReferenceCounted) request).refCnt() > 0) {
+                        log.debug("Releasing Netty HttpRequest ByteBuf");
+                        ReferenceCountUtil.release(request);
+                    }
+                }
+            }
+        });
+
         return request;
     }
 
@@ -92,23 +108,38 @@ public class NettyHttpProducer extends NettyProducer {
         @Override
         public void done(boolean doneSync) {
             try {
-                NettyHttpMessage nettyMessage = exchange.hasOut() ? 
exchange.getOut(NettyHttpMessage.class) : 
exchange.getIn(NettyHttpMessage.class);
-                if (nettyMessage != null) {
-                    FullHttpResponse response = nettyMessage.getHttpResponse();
-                    // Need to retain the ByteBuffer for producer to consumer
-                    if (response != null) {
-                        response.content().retain();
-                        // the actual url is stored on the IN message in the 
getRequestBody method as its accessed on-demand
-                        String actualUrl = 
exchange.getIn().getHeader(Exchange.HTTP_URL, String.class);
-                        int code = response.getStatus() != null ? 
response.getStatus().code() : -1;
-                        log.debug("Http responseCode: {}", code);
-
-                        // if there was a http error code then check if we 
should throw an exception
-                        boolean ok = NettyHttpHelper.isStatusCodeOk(code, 
configuration.getOkStatusCodeRange());
-                        if (!ok && 
getConfiguration().isThrowExceptionOnFailure()) {
-                            // operation failed so populate exception to throw
-                            Exception cause = 
NettyHttpHelper.populateNettyHttpOperationFailedException(exchange, actualUrl, 
response, code, getConfiguration().isTransferException());
-                            exchange.setException(cause);
+                // only handle when we are done asynchronous as then the netty 
producer is done sending, and we have a response
+                if (!doneSync) {
+                    NettyHttpMessage nettyMessage = exchange.hasOut() ? 
exchange.getOut(NettyHttpMessage.class) : 
exchange.getIn(NettyHttpMessage.class);
+                    if (nettyMessage != null) {
+                        final FullHttpResponse response = 
nettyMessage.getHttpResponse();
+                        // Need to retain the ByteBuffer for producer to 
consumer
+                        if (response != null) {
+                            response.content().retain();
+
+                            // need to release the response when we are done
+                            exchange.addOnCompletion(new 
SynchronizationAdapter(){
+                                @Override
+                                public void onDone(Exchange exchange) {
+                                    if (response.refCnt() > 0) {
+                                        log.debug("Releasing Netty HttpResonse 
ByteBuf");
+                                        ReferenceCountUtil.release(response);
+                                    }
+                                }
+                            });
+
+                            // the actual url is stored on the IN message in 
the getRequestBody method as its accessed on-demand
+                            String actualUrl = 
exchange.getIn().getHeader(Exchange.HTTP_URL, String.class);
+                            int code = response.getStatus() != null ? 
response.getStatus().code() : -1;
+                            log.debug("Http responseCode: {}", code);
+
+                            // if there was a http error code then check if we 
should throw an exception
+                            boolean ok = NettyHttpHelper.isStatusCodeOk(code, 
configuration.getOkStatusCodeRange());
+                            if (!ok && 
getConfiguration().isThrowExceptionOnFailure()) {
+                                // operation failed so populate exception to 
throw
+                                Exception cause = 
NettyHttpHelper.populateNettyHttpOperationFailedException(exchange, actualUrl, 
response, code, getConfiguration().isTransferException());
+                                exchange.setException(cause);
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
index f895fac..4c3b799 100644
--- 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
+++ 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
@@ -34,7 +34,7 @@ public class NettyHttp500ErrorTest extends BaseNettyTest {
         } catch (CamelExecutionException e) {
             NettyHttpOperationFailedException cause = 
assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
             assertEquals(500, cause.getStatusCode());
-            assertEquals("Camel cannot do this", 
context.getTypeConverter().convertTo(String.class, 
cause.getHttpContent().content()));
+            assertEquals("Camel cannot do this", cause.getContentAsString());
         }
 
         assertMockEndpointsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
index 13c7f68..450009b 100644
--- 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
+++ 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
@@ -32,11 +32,10 @@ public class NettyHttp500ErrorThrowExceptionOnServerTest 
extends BaseNettyTest {
         } catch (CamelExecutionException e) {
             NettyHttpOperationFailedException cause = 
assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
             assertEquals(500, cause.getStatusCode());
-            String trace = context.getTypeConverter().convertTo(String.class, 
cause.getHttpContent().content());
+            String trace = cause.getContentAsString();
             assertNotNull(trace);
             assertTrue(trace.startsWith("java.lang.IllegalArgumentException: 
Camel cannot do this"));
             assertEquals("http://localhost:"; + getPort() + "/foo", 
cause.getUri());
-            cause.getHttpContent().content().release();
         }
 
         assertMockEndpointsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
index f19690f..dd17a23 100644
--- 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
+++ 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
@@ -71,9 +71,7 @@ public class NettyHttpHandle404Test extends BaseNettyTest {
                                 // instead as an exception that will get 
thrown and thus the route breaks
                                 NettyHttpOperationFailedException cause = 
exchange.getProperty(Exchange.EXCEPTION_CAUGHT, 
NettyHttpOperationFailedException.class);
                                 
exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, cause.getStatusCode());
-                                
exchange.getOut().setBody(cause.getHttpContent().content().toString(Charset.defaultCharset()));
-                                // release as no longer in use
-                                cause.getHttpContent().content().release();
+                                
exchange.getOut().setBody(cause.getContentAsString());
                             }
                         })
                         .end();

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
index 0a0fa36..3aef7f3 100644
--- 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
+++ 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
@@ -32,9 +32,8 @@ public class NettyHttpOkStatusCodeTest extends BaseNettyTest {
         } catch (CamelExecutionException e) {
             NettyHttpOperationFailedException cause = 
assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
             assertEquals(209, cause.getStatusCode());
-            String body = context.getTypeConverter().convertTo(String.class, 
cause.getHttpContent().content());
+            String body = cause.getContentAsString();
             assertEquals("Not allowed", body);
-            cause.getHttpContent().content().release();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
index 9b8def5..fb452a0 100644
--- 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
+++ 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
@@ -36,9 +36,8 @@ public class NettyHttpReturnFaultTest extends BaseNettyTest {
         NettyHttpOperationFailedException exception = 
exchange.getException(NettyHttpOperationFailedException.class);
         assertNotNull(exception);
         assertEquals(500, exception.getStatusCode());
-        String message = context.getTypeConverter().convertTo(String.class, 
exception.getHttpContent().content());
+        String message = exception.getContentAsString();
         assertEquals("This is a fault", message);
-        exception.getHttpContent().content().release();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index b9a2a17..60db52f 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -94,6 +94,8 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             // signal callback
             callback.done(false);
         }
+
+        super.exceptionCaught(ctx, cause);
     }
 
     @Override

Reply via email to