CAMEL-9040: Fixed netty leak in http4 producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3563f6e6 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3563f6e6 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3563f6e6 Branch: refs/heads/master Commit: 3563f6e6a4cbea2841cdd6e780156683aa575b14 Parents: 74a7020 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed May 4 13:40:38 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed May 4 14:07:56 2016 +0200 ---------------------------------------------------------------------- .../netty4/http/DefaultNettyHttpBinding.java | 20 +++++- .../http/NettyHttpOperationFailedException.java | 28 ++++++-- .../netty4/http/NettyHttpProducer.java | 67 ++++++++++++++------ .../netty4/http/NettyHttp500ErrorTest.java | 2 +- ...yHttp500ErrorThrowExceptionOnServerTest.java | 3 +- .../netty4/http/NettyHttpHandle404Test.java | 4 +- .../netty4/http/NettyHttpOkStatusCodeTest.java | 3 +- .../netty4/http/NettyHttpReturnFaultTest.java | 3 +- .../netty4/handlers/ClientChannelHandler.java | 2 + 9 files changed, 97 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java index e3a28f7..f8cf4a3 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java @@ -17,6 +17,7 @@ package org.apache.camel.component.netty4.http; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.io.ObjectOutputStream; import java.io.PrintWriter; import java.io.StringWriter; @@ -44,6 +45,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.StreamCache; import org.apache.camel.TypeConverter; import org.apache.camel.component.netty4.NettyConstants; import org.apache.camel.component.netty4.NettyConverter; @@ -268,8 +270,21 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { populateCamelHeaders(response, answer.getHeaders(), exchange, configuration); } - // keep the body as is, and use type converters - answer.setBody(response.content()); + if (configuration.isDisableStreamCache()) { + // keep the body as is, and use type converters + answer.setBody(response.content()); + } else { + // stores as byte array as the netty ByteBuf will be freedy when the producer is done, and then we + // can no longer access the message body + response.retain(); + try { + byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, response.content()); + answer.setBody(bytes); + // TODO: use stream caching + } finally { + response.release(); + } + } return answer; } @@ -320,7 +335,6 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { LOG.trace("HTTP Status Code: {}", code); - // if there was an exception then use that as body if (cause != null) { if (configuration.isTransferException()) { http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java index abea14d..d75ee31 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,11 +16,13 @@ */ package org.apache.camel.component.netty4.http; +import java.io.UnsupportedEncodingException; + import io.netty.handler.codec.http.HttpContent; import org.apache.camel.CamelException; +import org.apache.camel.component.netty4.NettyConverter; import org.apache.camel.util.ObjectHelper; - /** * Exception when a Netty HTTP operation failed. */ @@ -31,6 +33,7 @@ public class NettyHttpOperationFailedException extends CamelException { private final int statusCode; private final String statusText; private final transient HttpContent content; + private String contentAsString; public NettyHttpOperationFailedException(String uri, int statusCode, String statusText, String location, HttpContent content) { super("Netty HTTP operation failed invoking " + uri + " with statusCode: " + statusCode + (location != null ? ", redirectLocation: " + location : "")); @@ -39,6 +42,11 @@ public class NettyHttpOperationFailedException extends CamelException { this.statusText = statusText; this.redirectLocation = location; this.content = content; + try { + this.contentAsString = NettyConverter.toString(content.content(), null); + } catch (UnsupportedEncodingException e) { + // ignore + } } public String getUri() { @@ -70,8 +78,20 @@ public class NettyHttpOperationFailedException extends CamelException { * <p/> * Notice this may be <tt>null</tt> if this exception has been serialized, * as the {@link HttpContent} instance is marked as transient in this class. + * + * @deprecated use getContentAsString(); */ + @Deprecated public HttpContent getHttpContent() { return content; } + + /** + * Gets the HTTP content as a String + * <p/> + * Notice this may be <tt>null</tt> if it was not possible to read the content + */ + public String getContentAsString() { + return contentAsString; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java index ced0bdc..37e7ad8 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java @@ -21,11 +21,14 @@ import java.net.URI; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpRequest; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.component.netty4.NettyConfiguration; import org.apache.camel.component.netty4.NettyConstants; import org.apache.camel.component.netty4.NettyProducer; +import org.apache.camel.support.SynchronizationAdapter; /** @@ -58,7 +61,7 @@ public class NettyHttpProducer extends NettyProducer { String uri = NettyHttpHelper.createURL(exchange, getEndpoint()); URI u = NettyHttpHelper.createURI(exchange, uri, getEndpoint()); - HttpRequest request = getEndpoint().getNettyHttpBinding().toNettyRequest(exchange.getIn(), u.toString(), getConfiguration()); + final HttpRequest request = getEndpoint().getNettyHttpBinding().toNettyRequest(exchange.getIn(), u.toString(), getConfiguration()); String actualUri = request.getUri(); exchange.getIn().setHeader(Exchange.HTTP_URL, actualUri); // Need to check if we need to close the connection or not @@ -71,6 +74,19 @@ public class NettyHttpProducer extends NettyProducer { exchange.getIn().removeHeader("host"); } + // need to release the request when we are done + exchange.addOnCompletion(new SynchronizationAdapter(){ + @Override + public void onDone(Exchange exchange) { + if (request instanceof ReferenceCounted) { + if (((ReferenceCounted) request).refCnt() > 0) { + log.debug("Releasing Netty HttpRequest ByteBuf"); + ReferenceCountUtil.release(request); + } + } + } + }); + return request; } @@ -92,23 +108,38 @@ public class NettyHttpProducer extends NettyProducer { @Override public void done(boolean doneSync) { try { - NettyHttpMessage nettyMessage = exchange.hasOut() ? exchange.getOut(NettyHttpMessage.class) : exchange.getIn(NettyHttpMessage.class); - if (nettyMessage != null) { - FullHttpResponse response = nettyMessage.getHttpResponse(); - // Need to retain the ByteBuffer for producer to consumer - if (response != null) { - response.content().retain(); - // the actual url is stored on the IN message in the getRequestBody method as its accessed on-demand - String actualUrl = exchange.getIn().getHeader(Exchange.HTTP_URL, String.class); - int code = response.getStatus() != null ? response.getStatus().code() : -1; - log.debug("Http responseCode: {}", code); - - // if there was a http error code then check if we should throw an exception - boolean ok = NettyHttpHelper.isStatusCodeOk(code, configuration.getOkStatusCodeRange()); - if (!ok && getConfiguration().isThrowExceptionOnFailure()) { - // operation failed so populate exception to throw - Exception cause = NettyHttpHelper.populateNettyHttpOperationFailedException(exchange, actualUrl, response, code, getConfiguration().isTransferException()); - exchange.setException(cause); + // only handle when we are done asynchronous as then the netty producer is done sending, and we have a response + if (!doneSync) { + NettyHttpMessage nettyMessage = exchange.hasOut() ? exchange.getOut(NettyHttpMessage.class) : exchange.getIn(NettyHttpMessage.class); + if (nettyMessage != null) { + final FullHttpResponse response = nettyMessage.getHttpResponse(); + // Need to retain the ByteBuffer for producer to consumer + if (response != null) { + response.content().retain(); + + // need to release the response when we are done + exchange.addOnCompletion(new SynchronizationAdapter(){ + @Override + public void onDone(Exchange exchange) { + if (response.refCnt() > 0) { + log.debug("Releasing Netty HttpResonse ByteBuf"); + ReferenceCountUtil.release(response); + } + } + }); + + // the actual url is stored on the IN message in the getRequestBody method as its accessed on-demand + String actualUrl = exchange.getIn().getHeader(Exchange.HTTP_URL, String.class); + int code = response.getStatus() != null ? response.getStatus().code() : -1; + log.debug("Http responseCode: {}", code); + + // if there was a http error code then check if we should throw an exception + boolean ok = NettyHttpHelper.isStatusCodeOk(code, configuration.getOkStatusCodeRange()); + if (!ok && getConfiguration().isThrowExceptionOnFailure()) { + // operation failed so populate exception to throw + Exception cause = NettyHttpHelper.populateNettyHttpOperationFailedException(exchange, actualUrl, response, code, getConfiguration().isTransferException()); + exchange.setException(cause); + } } } } http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java index f895fac..4c3b799 100644 --- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java @@ -34,7 +34,7 @@ public class NettyHttp500ErrorTest extends BaseNettyTest { } catch (CamelExecutionException e) { NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause()); assertEquals(500, cause.getStatusCode()); - assertEquals("Camel cannot do this", context.getTypeConverter().convertTo(String.class, cause.getHttpContent().content())); + assertEquals("Camel cannot do this", cause.getContentAsString()); } assertMockEndpointsSatisfied(); http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java index 13c7f68..450009b 100644 --- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java @@ -32,11 +32,10 @@ public class NettyHttp500ErrorThrowExceptionOnServerTest extends BaseNettyTest { } catch (CamelExecutionException e) { NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause()); assertEquals(500, cause.getStatusCode()); - String trace = context.getTypeConverter().convertTo(String.class, cause.getHttpContent().content()); + String trace = cause.getContentAsString(); assertNotNull(trace); assertTrue(trace.startsWith("java.lang.IllegalArgumentException: Camel cannot do this")); assertEquals("http://localhost:" + getPort() + "/foo", cause.getUri()); - cause.getHttpContent().content().release(); } assertMockEndpointsSatisfied(); http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java index f19690f..dd17a23 100644 --- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java @@ -71,9 +71,7 @@ public class NettyHttpHandle404Test extends BaseNettyTest { // instead as an exception that will get thrown and thus the route breaks NettyHttpOperationFailedException cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, NettyHttpOperationFailedException.class); exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, cause.getStatusCode()); - exchange.getOut().setBody(cause.getHttpContent().content().toString(Charset.defaultCharset())); - // release as no longer in use - cause.getHttpContent().content().release(); + exchange.getOut().setBody(cause.getContentAsString()); } }) .end(); http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java index 0a0fa36..3aef7f3 100644 --- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java @@ -32,9 +32,8 @@ public class NettyHttpOkStatusCodeTest extends BaseNettyTest { } catch (CamelExecutionException e) { NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause()); assertEquals(209, cause.getStatusCode()); - String body = context.getTypeConverter().convertTo(String.class, cause.getHttpContent().content()); + String body = cause.getContentAsString(); assertEquals("Not allowed", body); - cause.getHttpContent().content().release(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java index 9b8def5..fb452a0 100644 --- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java @@ -36,9 +36,8 @@ public class NettyHttpReturnFaultTest extends BaseNettyTest { NettyHttpOperationFailedException exception = exchange.getException(NettyHttpOperationFailedException.class); assertNotNull(exception); assertEquals(500, exception.getStatusCode()); - String message = context.getTypeConverter().convertTo(String.class, exception.getHttpContent().content()); + String message = exception.getContentAsString(); assertEquals("This is a fault", message); - exception.getHttpContent().content().release(); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java index b9a2a17..60db52f 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java @@ -94,6 +94,8 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { // signal callback callback.done(false); } + + super.exceptionCaught(ctx, cause); } @Override