Repository: camel
Updated Branches:
  refs/heads/master ced84063a -> 7031ec87c


CAMEL-8086: Fixed memory leak with convertBodyTo/transform/setBody if used in a 
long running Camel route such as a dynamic router that does a while loop 1000 
times etc on the same Exchange.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7031ec87
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7031ec87
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7031ec87

Branch: refs/heads/master
Commit: 7031ec87cef60db278af8afac62a55539dc7f575
Parents: ced8406
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Dec 3 15:17:38 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Dec 3 17:20:23 2014 +0100

----------------------------------------------------------------------
 .../camel/processor/ConvertBodyProcessor.java   | 26 ++++--
 .../camel/processor/SetBodyProcessor.java       | 19 +++--
 .../camel/processor/TransformProcessor.java     | 28 ++++--
 .../org/apache/camel/util/ExchangeHelper.java   | 22 +++++
 .../DynamicRouterConvertBodyToIssueTest.java    | 90 ++++++++++++++++++++
 5 files changed, 165 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7031ec87/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
index 3ffb190..40e863b 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
@@ -23,6 +23,7 @@ import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 
@@ -60,8 +61,10 @@ public class ConvertBodyProcessor extends ServiceSupport 
implements AsyncProcess
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        Message in = exchange.getIn();
-        if (in.getBody() == null) {
+        boolean out = exchange.hasOut();
+        Message old = out ? exchange.getOut() : exchange.getIn();
+
+        if (old.getBody() == null) {
             // only convert if the is a body
             callback.done(true);
             return true;
@@ -75,7 +78,7 @@ public class ConvertBodyProcessor extends ServiceSupport 
implements AsyncProcess
         // use mandatory conversion
         Object value;
         try {
-            value = in.getMandatoryBody(type);
+            value = old.getMandatoryBody(type);
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(true);
@@ -83,14 +86,19 @@ public class ConvertBodyProcessor extends ServiceSupport 
implements AsyncProcess
         }
 
         // create a new message container so we do not drag specialized 
message objects along
-        Message msg = new DefaultMessage();
-        msg.copyFrom(in);
-        msg.setBody(value);
+        // but that is only needed if the old message is a specialized message
+        boolean copyNeeded = !(old.getClass().equals(DefaultMessage.class));
+
+        if (copyNeeded) {
+            Message msg = new DefaultMessage();
+            msg.copyFrom(old);
+            msg.setBody(value);
 
-        if (exchange.getPattern().isOutCapable()) {
-            exchange.setOut(msg);
+            // replace message on exchange
+            ExchangeHelper.replaceMessage(exchange, msg, false);
         } else {
-            exchange.setIn(msg);
+            // no copy needed so set replace value directly
+            old.setBody(value);
         }
 
         // remove charset when we are done as we should not propagate that,

http://git-wip-us.apache.org/repos/asf/camel/blob/7031ec87/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
index 42f2834..8da646a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
@@ -25,6 +25,7 @@ import org.apache.camel.Traceable;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
 
 /**
  * A processor which sets the body on the IN or OUT message with an {@link 
Expression}
@@ -49,15 +50,21 @@ public class SetBodyProcessor extends ServiceSupport 
implements AsyncProcessor,
             Message old = out ? exchange.getOut() : exchange.getIn();
 
             // create a new message container so we do not drag specialized 
message objects along
-            Message msg = new DefaultMessage();
-            msg.copyFrom(old);
-            msg.setBody(newBody);
+            // but that is only needed if the old message is a specialized 
message
+            boolean copyNeeded = 
!(old.getClass().equals(DefaultMessage.class));
 
-            if (out) {
-                exchange.setOut(msg);
+            if (copyNeeded) {
+                Message msg = new DefaultMessage();
+                msg.copyFrom(old);
+                msg.setBody(newBody);
+
+                // replace message on exchange
+                ExchangeHelper.replaceMessage(exchange, msg, false);
             } else {
-                exchange.setIn(msg);
+                // no copy needed so set replace value directly
+                old.setBody(newBody);
             }
+
         } catch (Exception e) {
             exchange.setException(e);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/7031ec87/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java
index 7c8ee79..61db471 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java
@@ -25,6 +25,7 @@ import org.apache.camel.Traceable;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -46,13 +47,30 @@ public class TransformProcessor extends ServiceSupport 
implements AsyncProcessor
         try {
             Object newBody = expression.evaluate(exchange, Object.class);
 
-            Message old = exchange.getIn();
+            boolean out = exchange.hasOut();
+            Message old = out ? exchange.getOut() : exchange.getIn();
 
             // create a new message container so we do not drag specialized 
message objects along
-            Message msg = new DefaultMessage();
-            msg.copyFrom(old);
-            msg.setBody(newBody);
-            exchange.setOut(msg);
+            // but that is only needed if the old message is a specialized 
message
+            boolean copyNeeded = 
!(old.getClass().equals(DefaultMessage.class));
+
+            if (copyNeeded) {
+                Message msg = new DefaultMessage();
+                msg.copyFrom(old);
+                msg.setBody(newBody);
+
+                // replace message on exchange (must set as OUT)
+                ExchangeHelper.replaceMessage(exchange, msg, true);
+            } else {
+                // no copy needed so set replace value directly
+                old.setBody(newBody);
+
+                // but the message must be on OUT
+                if (!exchange.hasOut()) {
+                    exchange.setOut(exchange.getIn());
+                }
+            }
+
         } catch (Exception e) {
             exchange.setException(e);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/7031ec87/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 905ddda..cc6836d 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -43,6 +43,7 @@ import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.MessageSupport;
 import org.apache.camel.spi.UnitOfWork;
 
 /**
@@ -816,6 +817,27 @@ public final class ExchangeHelper {
         return answer;
     }
 
+    /**
+     * Replaces the existing message with the new message
+     *
+     * @param exchange  the exchange
+     * @param newMessage the new message
+     * @param outOnly    whether to replace the message as OUT message
+     */
+    public static void replaceMessage(Exchange exchange, Message newMessage, 
boolean outOnly) {
+        Message old = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        if (outOnly || exchange.hasOut()) {
+            exchange.setOut(newMessage);
+        } else {
+            exchange.setIn(newMessage);
+        }
+
+        // need to de-reference old from the exchange so it can be GC
+        if (old instanceof MessageSupport) {
+            ((MessageSupport) old).setExchange(null);
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private static Map<String, Object> safeCopy(Map<String, Object> 
properties) {
         if (properties == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/7031ec87/camel-core/src/test/java/org/apache/camel/issues/DynamicRouterConvertBodyToIssueTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/issues/DynamicRouterConvertBodyToIssueTest.java
 
b/camel-core/src/test/java/org/apache/camel/issues/DynamicRouterConvertBodyToIssueTest.java
new file mode 100644
index 0000000..8d1239c
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/issues/DynamicRouterConvertBodyToIssueTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Properties;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Ignore;
+
+@Ignore("CAMEL-8086: used for manual testing a memory issue")
+public class DynamicRouterConvertBodyToIssueTest extends ContextTestSupport 
implements Processor {
+
+    private static final int MAX_ITERATIONS = 1000;
+    private static int counter;
+
+    public void testIssue() throws Exception {
+        template.sendBody("seda:foo", "Hello World");
+
+        Thread.sleep(60000);
+    }
+
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo")
+                    
.dynamicRouter().method(DynamicRouterConvertBodyToIssueTest.class, "slip")
+                    .to("mock:result");
+
+                from("direct:while_body")
+                    .process(new DynamicRouterConvertBodyToIssueTest())
+                    .convertBodyTo(String.class);
+            }
+        };
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        log.info("Some: " + counter);
+
+        exchange.setProperty("EXIT", "NO");
+        StringBuffer sb = new StringBuffer();
+        for (int i = 0; i < 10000; i++) {
+            sb.append(UUID.randomUUID().toString());
+        }
+        exchange.getIn().setBody(sb);
+
+        Thread.currentThread().sleep(100);
+
+        if (counter++ > MAX_ITERATIONS) {
+            exchange.setProperty("EXIT", "PLEASE");
+        }
+    }
+
+    public String slip(String body, @Properties Map<String, Object> 
properties) {
+        log.info("slip " + properties.get("EXIT"));
+        if (properties.get("EXIT") != null && 
properties.get("EXIT").equals("PLEASE")) {
+            log.info("Exiting after " + MAX_ITERATIONS + " iterations");
+            return null;
+        } else {
+            return "direct:while_body";
+        }
+    }
+
+}

Reply via email to