Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 19f619eee -> e822ae58c
  refs/heads/master f39b83eeb -> 2e9bce8a0


CAMEL-10171 memory leak when continuation expires

setobject done earlier and exception set on camelExchange.
isExpired method call thru Continuation interface is cancelled and
below issues became invalid as discussed in CXF-7011

https://issues.apache.org/jira/browse/CXF-7002
https://issues.apache.org/jira/browse/CXF-7011

instead

such block means the same;

https://issues.apache.org/jira/browse/CXF-7011?focusedCommentId=15422696&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15422696


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e9bce8a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e9bce8a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e9bce8a

Branch: refs/heads/master
Commit: 2e9bce8a064b6694cb7985955582baca90698b0c
Parents: f39b83e
Author: önder sezgin <ondersez...@gmail.com>
Authored: Tue Aug 16 22:22:46 2016 +0300
Committer: önder sezgin <ondersez...@gmail.com>
Committed: Tue Aug 16 22:22:46 2016 +0300

----------------------------------------------------------------------
 .../org/apache/camel/component/cxf/CxfConsumer.java   | 14 +++++++++++++-
 .../camel/component/cxf/jaxrs/CxfRsInvoker.java       | 14 +++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2e9bce8a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index 2271e2f..e16127b 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -26,6 +26,7 @@ import org.w3c.dom.Element;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Processor;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
 import org.apache.camel.impl.DefaultConsumer;
@@ -148,6 +149,8 @@ public class CxfConsumer extends DefaultConsumer {
 
                     // The continuation could be called before the suspend is 
called
                     continuation.suspend(cxfEndpoint.getContinuationTimeout());
+                    
+                    continuation.setObject(camelExchange);
 
                     // use the asynchronous API to process the exchange
                     getAsyncProcessor().process(camelExchange, new 
AsyncCallback() {
@@ -156,7 +159,6 @@ public class CxfConsumer extends DefaultConsumer {
                             synchronized (continuation) {
                                 LOG.trace("Resuming continuation of 
exchangeId: {}", camelExchange.getExchangeId());
                                 // resume processing after both, sync and 
async callbacks
-                                continuation.setObject(camelExchange);
                                 continuation.resume();
                             }
                         }
@@ -170,6 +172,16 @@ public class CxfConsumer extends DefaultConsumer {
                         CxfConsumer.this.doneUoW(camelExchange);
                     }
 
+                } else if (!continuation.isResumed() && 
!continuation.isPending()) {
+                    org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation.getObject();
+                    try {
+                        if (!continuation.isPending()) {
+                            camelExchange.setException(new 
ExchangeTimedOutException(camelExchange, cxfEndpoint.getContinuationTimeout()));
+                        }
+                        setResponseBack(cxfExchange, camelExchange);
+                    } finally {
+                        CxfConsumer.this.doneUoW(camelExchange);
+                    }
                 }
             }
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/2e9bce8a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
index fb999f0..01563d3 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
@@ -26,6 +26,7 @@ import javax.ws.rs.core.UriInfo;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
@@ -90,13 +91,13 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 // The continuation could be called before the suspend is 
called
                 continuation.suspend(endpoint.getContinuationTimeout());
                 cxfExchange.put(SUSPENED, Boolean.TRUE);
+                continuation.setObject(camelExchange);
                 cxfRsConsumer.getAsyncProcessor().process(camelExchange, new 
AsyncCallback() {
                     public void done(boolean doneSync) {
                         // make sure the continuation resume will not be 
called before the suspend method in other thread
                         synchronized (continuation) {
                             LOG.trace("Resuming continuation of exchangeId: 
{}", camelExchange.getExchangeId());
                             // resume processing after both, sync and async 
callbacks
-                            continuation.setObject(camelExchange);
                             continuation.resume();
                         }
                     }
@@ -111,6 +112,17 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 } finally {
                     cxfRsConsumer.doneUoW(camelExchange);
                 }
+            } else {
+                if (!continuation.isPending()) {
+                    cxfExchange.put(SUSPENED, Boolean.FALSE);
+                    org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation.getObject();
+                    camelExchange.setException(new 
ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout()));
+                    try {
+                        return returnResponse(cxfExchange, camelExchange);
+                    } finally {
+                        cxfRsConsumer.doneUoW(camelExchange);
+                    }
+                }
             }
         }
         return null;

Reply via email to