Repository: camel Updated Branches: refs/heads/camel-2.13.x 1e064a358 -> 431d06733 refs/heads/camel-2.14.x ef142f9e9 -> 3e2bf592e
CAMEL-8086: Fixed memory leak with convertBodyTo/transform/setBody if used in a long running Camel route such as a dynamic router that does a while loop 1000 times etc on the same Exchange. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/431d0673 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/431d0673 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/431d0673 Branch: refs/heads/camel-2.13.x Commit: 431d06733a7ef151805a3a443734451271e4d6d5 Parents: 1e064a3 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Dec 3 15:17:38 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 3 17:40:22 2014 +0100 ---------------------------------------------------------------------- .../camel/processor/ConvertBodyProcessor.java | 26 ++++-- .../camel/processor/SetBodyProcessor.java | 19 +++-- .../camel/processor/TransformProcessor.java | 28 ++++-- .../org/apache/camel/util/ExchangeHelper.java | 22 +++++ .../DynamicRouterConvertBodyToIssueTest.java | 90 ++++++++++++++++++++ 5 files changed, 165 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/431d0673/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java index 3ffb190..40e863b 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.Message; import org.apache.camel.impl.DefaultMessage; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; @@ -60,8 +61,10 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess @Override public boolean process(Exchange exchange, AsyncCallback callback) { - Message in = exchange.getIn(); - if (in.getBody() == null) { + boolean out = exchange.hasOut(); + Message old = out ? exchange.getOut() : exchange.getIn(); + + if (old.getBody() == null) { // only convert if the is a body callback.done(true); return true; @@ -75,7 +78,7 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess // use mandatory conversion Object value; try { - value = in.getMandatoryBody(type); + value = old.getMandatoryBody(type); } catch (Exception e) { exchange.setException(e); callback.done(true); @@ -83,14 +86,19 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess } // create a new message container so we do not drag specialized message objects along - Message msg = new DefaultMessage(); - msg.copyFrom(in); - msg.setBody(value); + // but that is only needed if the old message is a specialized message + boolean copyNeeded = !(old.getClass().equals(DefaultMessage.class)); + + if (copyNeeded) { + Message msg = new DefaultMessage(); + msg.copyFrom(old); + msg.setBody(value); - if (exchange.getPattern().isOutCapable()) { - exchange.setOut(msg); + // replace message on exchange + ExchangeHelper.replaceMessage(exchange, msg, false); } else { - exchange.setIn(msg); + // no copy needed so set replace value directly + old.setBody(value); } // remove charset when we are done as we should not propagate that, http://git-wip-us.apache.org/repos/asf/camel/blob/431d0673/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java index 42f2834..8da646a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java @@ -25,6 +25,7 @@ import org.apache.camel.Traceable; import org.apache.camel.impl.DefaultMessage; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ExchangeHelper; /** * A processor which sets the body on the IN or OUT message with an {@link Expression} @@ -49,15 +50,21 @@ public class SetBodyProcessor extends ServiceSupport implements AsyncProcessor, Message old = out ? exchange.getOut() : exchange.getIn(); // create a new message container so we do not drag specialized message objects along - Message msg = new DefaultMessage(); - msg.copyFrom(old); - msg.setBody(newBody); + // but that is only needed if the old message is a specialized message + boolean copyNeeded = !(old.getClass().equals(DefaultMessage.class)); - if (out) { - exchange.setOut(msg); + if (copyNeeded) { + Message msg = new DefaultMessage(); + msg.copyFrom(old); + msg.setBody(newBody); + + // replace message on exchange + ExchangeHelper.replaceMessage(exchange, msg, false); } else { - exchange.setIn(msg); + // no copy needed so set replace value directly + old.setBody(newBody); } + } catch (Exception e) { exchange.setException(e); } http://git-wip-us.apache.org/repos/asf/camel/blob/431d0673/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java index 7c8ee79..61db471 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java @@ -25,6 +25,7 @@ import org.apache.camel.Traceable; import org.apache.camel.impl.DefaultMessage; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; /** @@ -46,13 +47,30 @@ public class TransformProcessor extends ServiceSupport implements AsyncProcessor try { Object newBody = expression.evaluate(exchange, Object.class); - Message old = exchange.getIn(); + boolean out = exchange.hasOut(); + Message old = out ? exchange.getOut() : exchange.getIn(); // create a new message container so we do not drag specialized message objects along - Message msg = new DefaultMessage(); - msg.copyFrom(old); - msg.setBody(newBody); - exchange.setOut(msg); + // but that is only needed if the old message is a specialized message + boolean copyNeeded = !(old.getClass().equals(DefaultMessage.class)); + + if (copyNeeded) { + Message msg = new DefaultMessage(); + msg.copyFrom(old); + msg.setBody(newBody); + + // replace message on exchange (must set as OUT) + ExchangeHelper.replaceMessage(exchange, msg, true); + } else { + // no copy needed so set replace value directly + old.setBody(newBody); + + // but the message must be on OUT + if (!exchange.hasOut()) { + exchange.setOut(exchange.getIn()); + } + } + } catch (Exception e) { exchange.setException(e); } http://git-wip-us.apache.org/repos/asf/camel/blob/431d0673/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index 905ddda..cc6836d 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -43,6 +43,7 @@ import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.TypeConversionException; import org.apache.camel.TypeConverter; import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.MessageSupport; import org.apache.camel.spi.UnitOfWork; /** @@ -816,6 +817,27 @@ public final class ExchangeHelper { return answer; } + /** + * Replaces the existing message with the new message + * + * @param exchange the exchange + * @param newMessage the new message + * @param outOnly whether to replace the message as OUT message + */ + public static void replaceMessage(Exchange exchange, Message newMessage, boolean outOnly) { + Message old = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + if (outOnly || exchange.hasOut()) { + exchange.setOut(newMessage); + } else { + exchange.setIn(newMessage); + } + + // need to de-reference old from the exchange so it can be GC + if (old instanceof MessageSupport) { + ((MessageSupport) old).setExchange(null); + } + } + @SuppressWarnings("unchecked") private static Map<String, Object> safeCopy(Map<String, Object> properties) { if (properties == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/431d0673/camel-core/src/test/java/org/apache/camel/issues/DynamicRouterConvertBodyToIssueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/DynamicRouterConvertBodyToIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/DynamicRouterConvertBodyToIssueTest.java new file mode 100644 index 0000000..8d1239c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/DynamicRouterConvertBodyToIssueTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import java.util.Map; +import java.util.UUID; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Properties; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Ignore; + +@Ignore("CAMEL-8086: used for manual testing a memory issue") +public class DynamicRouterConvertBodyToIssueTest extends ContextTestSupport implements Processor { + + private static final int MAX_ITERATIONS = 1000; + private static int counter; + + public void testIssue() throws Exception { + template.sendBody("seda:foo", "Hello World"); + + Thread.sleep(60000); + } + + @Override + protected boolean useJmx() { + return true; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo") + .dynamicRouter().method(DynamicRouterConvertBodyToIssueTest.class, "slip") + .to("mock:result"); + + from("direct:while_body") + .process(new DynamicRouterConvertBodyToIssueTest()) + .convertBodyTo(String.class); + } + }; + } + + @Override + public void process(Exchange exchange) throws Exception { + log.info("Some: " + counter); + + exchange.setProperty("EXIT", "NO"); + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < 10000; i++) { + sb.append(UUID.randomUUID().toString()); + } + exchange.getIn().setBody(sb); + + Thread.currentThread().sleep(100); + + if (counter++ > MAX_ITERATIONS) { + exchange.setProperty("EXIT", "PLEASE"); + } + } + + public String slip(String body, @Properties Map<String, Object> properties) { + log.info("slip " + properties.get("EXIT")); + if (properties.get("EXIT") != null && properties.get("EXIT").equals("PLEASE")) { + log.info("Exiting after " + MAX_ITERATIONS + " iterations"); + return null; + } else { + return "direct:while_body"; + } + } + +}