Repository: camel Updated Branches: refs/heads/master d406c87f1 -> c7a4146d0
CAMEL-9195: Fixed memory leak in undertow producer. Also allow to configure channel options from the endpoint uri. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7a4146d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7a4146d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7a4146d Branch: refs/heads/master Commit: c7a4146d0ace0cd50a306a2501ce859dfaf3e35a Parents: d406c87 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Oct 7 09:17:22 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Oct 7 09:45:50 2015 +0200 ---------------------------------------------------------------------- .../component/undertow/UndertowComponent.java | 7 ++ .../component/undertow/UndertowEndpoint.java | 117 ++++++++++++++++++- .../component/undertow/UndertowProducer.java | 54 +++++++-- .../undertow/UndertowProducerLeakTest.java | 45 +++++++ .../src/test/resources/log4j.properties | 2 +- 5 files changed, 213 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java index 3584819..13ad686 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java @@ -35,6 +35,7 @@ import org.apache.camel.spi.RestApiConsumerFactory; import org.apache.camel.spi.RestConfiguration; import org.apache.camel.spi.RestConsumerFactory; import org.apache.camel.util.FileUtil; +import org.apache.camel.util.IntrospectionSupport; import org.apache.camel.util.URISupport; import org.apache.camel.util.UnsafeUriCharactersEncoder; import org.slf4j.Logger; @@ -58,10 +59,16 @@ public class UndertowComponent extends UriEndpointComponent implements RestConsu URI uriHttpUriAddress = new URI(UnsafeUriCharactersEncoder.encodeHttpURI(remaining)); URI endpointUri = URISupport.createRemainingURI(uriHttpUriAddress, parameters); + // any additional channel options + Map<String, Object> options = IntrospectionSupport.extractProperties(parameters, "option."); + // create the endpoint first UndertowEndpoint endpoint = createEndpointInstance(endpointUri, this); endpoint.setUndertowHttpBinding(undertowHttpBinding); setProperties(endpoint, parameters); + if (options != null) { + endpoint.setOptions(options); + } // then re-create the http uri with the remaining parameters which the endpoint did not use URI httpUri = URISupport.createRemainingURI( http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java index 746b609..4bb0f01 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java @@ -18,6 +18,8 @@ package org.apache.camel.component.undertow; import java.net.URI; import java.net.URISyntaxException; +import java.util.Locale; +import java.util.Map; import javax.net.ssl.SSLContext; import io.undertow.server.HttpServerExchange; @@ -34,16 +36,23 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.util.jsse.SSLContextParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xnio.Option; +import org.xnio.OptionMap; +import org.xnio.Options; /** * Represents an Undertow endpoint. */ @UriEndpoint(scheme = "undertow", title = "Undertow", syntax = "undertow:httpURI", - consumerClass = UndertowConsumer.class, label = "http") + consumerClass = UndertowConsumer.class, label = "http") public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { + private static final Logger LOG = LoggerFactory.getLogger(UndertowEndpoint.class); private UndertowComponent component; private SSLContext sslContext; + private OptionMap optionMap; @UriPath private URI httpURI; @@ -61,6 +70,14 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr private Boolean throwExceptionOnFailure; @UriParam private Boolean transferException; + @UriPath(label = "producer", defaultValue = "true") + private Boolean keepAlive = Boolean.TRUE; + @UriPath(label = "producer", defaultValue = "true") + private Boolean tcpNoDelay = Boolean.TRUE; + @UriPath(label = "producer", defaultValue = "true") + private Boolean reuseAddresses = Boolean.TRUE; + @UriParam(label = "producer") + private Map<String, Object> options; public UndertowEndpoint(String uri, UndertowComponent component) throws URISyntaxException { super(uri, component); @@ -74,7 +91,7 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr @Override public Producer createProducer() throws Exception { - return new UndertowProducer(this); + return new UndertowProducer(this, optionMap); } @Override @@ -206,6 +223,51 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr this.undertowHttpBinding = undertowHttpBinding; } + public Boolean getKeepAlive() { + return keepAlive; + } + + /** + * Setting to ensure socket is not closed due to inactivity + */ + public void setKeepAlive(Boolean keepAlive) { + this.keepAlive = keepAlive; + } + + public Boolean getTcpNoDelay() { + return tcpNoDelay; + } + + /** + * Setting to improve TCP protocol performance + */ + public void setTcpNoDelay(Boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public Boolean getReuseAddresses() { + return reuseAddresses; + } + + /** + * Setting to facilitate socket multiplexing + */ + public void setReuseAddresses(Boolean reuseAddresses) { + this.reuseAddresses = reuseAddresses; + } + + public Map<String, Object> getOptions() { + return options; + } + + /** + * Sets additional channel options. The options that can be used are defined in {@link org.xnio.Options}. + * To configure from endpoint uri, then prefix each option with <tt>option.</tt>, such as <tt>option.close-abort=true&option.send-buffer=8192</tt> + */ + public void setOptions(Map<String, Object> options) { + this.options = options; + } + @Override protected void doStart() throws Exception { super.doStart(); @@ -213,5 +275,56 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr if (sslContextParameters != null) { sslContext = sslContextParameters.createSSLContext(); } + + // create options map + if (options != null && !options.isEmpty()) { + + // favor to use the classloader that loaded the user application + ClassLoader cl = getComponent().getCamelContext().getApplicationContextClassLoader(); + if (cl == null) { + cl = Options.class.getClassLoader(); + } + + OptionMap.Builder builder = OptionMap.builder(); + for (Map.Entry<String, Object> entry : options.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (key != null && value != null) { + // upper case and dash as underscore + key = key.toUpperCase(Locale.ENGLISH).replace('-', '_'); + // must be field name + key = Options.class.getName() + "." + key; + Option option = Option.fromString(key, cl); + value = option.parseValue(value.toString(), cl); + LOG.trace("Parsed option {}={}", option.getName(), value); + builder.set(option, value); + } + } + optionMap = builder.getMap(); + } else { + // use an empty map + optionMap = OptionMap.EMPTY; + } + + // and then configure these default options if they have not been explicit configured + if (keepAlive != null && !optionMap.contains(Options.KEEP_ALIVE)) { + // rebuild map + OptionMap.Builder builder = OptionMap.builder(); + builder.addAll(optionMap).set(Options.KEEP_ALIVE, keepAlive); + optionMap = builder.getMap(); + } + if (tcpNoDelay != null && !optionMap.contains(Options.TCP_NODELAY)) { + // rebuild map + OptionMap.Builder builder = OptionMap.builder(); + builder.addAll(optionMap).set(Options.TCP_NODELAY, tcpNoDelay); + optionMap = builder.getMap(); + } + if (reuseAddresses != null && !optionMap.contains(Options.REUSE_ADDRESSES)) { + // rebuild map + OptionMap.Builder builder = OptionMap.builder(); + builder.addAll(optionMap).set(Options.REUSE_ADDRESSES, tcpNoDelay); + optionMap = builder.getMap(); + } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java index 84a8b26..0d1ba75 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java @@ -34,6 +34,7 @@ import org.apache.camel.Message; import org.apache.camel.TypeConverter; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.IOHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xnio.BufferAllocator; @@ -53,10 +54,14 @@ import org.xnio.XnioWorker; public class UndertowProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(UndertowProducer.class); private UndertowEndpoint endpoint; + private XnioWorker worker; + private ByteBufferSlicePool pool; + private OptionMap options; - public UndertowProducer(UndertowEndpoint endpoint) { + public UndertowProducer(UndertowEndpoint endpoint, OptionMap options) { super(endpoint); this.endpoint = endpoint; + this.options = options; } @Override @@ -66,12 +71,12 @@ public class UndertowProducer extends DefaultAsyncProducer { @Override public boolean process(Exchange exchange, AsyncCallback callback) { + ClientConnection connection = null; + try { final UndertowClient client = UndertowClient.getInstance(); - XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY); - IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker, - new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 8192), OptionMap.EMPTY); + IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker, pool, options); // creating the url to use takes 2-steps String url = UndertowHelper.createURL(exchange, getEndpoint()); @@ -99,9 +104,11 @@ public class UndertowProducer extends DefaultAsyncProducer { if (LOG.isDebugEnabled()) { LOG.debug("Executing http {} method: {}", method, url); } - connect.get().sendRequest(request, new UndertowProducerCallback(bodyAsByte, exchange, callback)); + connection = connect.get(); + connection.sendRequest(request, new UndertowProducerCallback(connection, bodyAsByte, exchange, callback)); } catch (Exception e) { + IOHelper.close(connection); exchange.setException(e); callback.done(true); return true; @@ -115,23 +122,45 @@ public class UndertowProducer extends DefaultAsyncProducer { return endpoint.getUndertowHttpBinding().toHttpRequest(request, camelExchange.getIn()); } + @Override + protected void doStart() throws Exception { + super.doStart(); + + pool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 8192); + worker = Xnio.getInstance().createWorker(options); + + LOG.debug("Created worker: {} with options: {}", worker, options); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + if (worker != null && !worker.isShutdown()) { + LOG.debug("Shutting down worker: {}", worker); + worker.shutdown(); + } + } + /** * Everything important happens in callback */ private class UndertowProducerCallback implements ClientCallback<ClientExchange> { + private final ClientConnection connection; private final ByteBuffer body; private final Exchange camelExchange; private final AsyncCallback callback; - public UndertowProducerCallback(ByteBuffer body, Exchange camelExchange, AsyncCallback callback) { + public UndertowProducerCallback(ClientConnection connection, ByteBuffer body, Exchange camelExchange, AsyncCallback callback) { + this.connection = connection; this.body = body; this.camelExchange = camelExchange; this.callback = callback; } @Override - public void completed(ClientExchange clientExchange) { + public void completed(final ClientExchange clientExchange) { clientExchange.setResponseListener(new ClientCallback<ClientExchange>() { @Override public void completed(ClientExchange clientExchange) { @@ -146,6 +175,7 @@ public class UndertowProducer extends DefaultAsyncProducer { } catch (Exception e) { camelExchange.setException(e); } finally { + IOHelper.close(connection); // make sure to call callback callback.done(false); } @@ -155,8 +185,12 @@ public class UndertowProducer extends DefaultAsyncProducer { public void failed(IOException e) { LOG.trace("failed: {}", e); camelExchange.setException(e); - // make sure to call callback - callback.done(false); + try { + IOHelper.close(connection); + } finally { + // make sure to call callback + callback.done(false); + } } }); @@ -167,6 +201,7 @@ public class UndertowProducer extends DefaultAsyncProducer { } } catch (IOException e) { camelExchange.setException(e); + IOHelper.close(connection); // make sure to call callback callback.done(false); } @@ -176,6 +211,7 @@ public class UndertowProducer extends DefaultAsyncProducer { public void failed(IOException e) { LOG.trace("failed: {}", e); camelExchange.setException(e); + IOHelper.close(connection); // make sure to call callback callback.done(false); } http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java new file mode 100644 index 0000000..cb11463 --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class UndertowProducerLeakTest extends BaseUndertowTest { + + @Test + public void testLeak() throws Exception { + getMockEndpoint("mock:result").expectedMinimumMessageCount(50); + + assertMockEndpointsSatisfied(2, TimeUnit.MINUTES); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("undertow:http://localhost:{{port}}/test").to("log:undertow?showAll=true").to("mock:result"); + + from("timer:foo?period=100").transform(constant("hello world")) + .to("undertow:http://localhost:{{port}}/test"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/test/resources/log4j.properties b/components/camel-undertow/src/test/resources/log4j.properties index b8d5c8b..8b9fe88 100644 --- a/components/camel-undertow/src/test/resources/log4j.properties +++ b/components/camel-undertow/src/test/resources/log4j.properties @@ -21,7 +21,7 @@ log4j.rootLogger=INFO, file # uncomment the following to enable camel debugging -#log4j.logger.org.apache.camel.component.undertow=TRACE +#log4j.logger.org.apache.camel.component.undertow=DEBUG #log4j.logger.org.apache.camel.util.jsse=DEBUG # CONSOLE appender not used by default