CAMEL-10171 memory leak when continuation expires

setobject done earlier and exception set on camelExchange.
isExpired method call thru Continuation interface is cancelled and
below issues became invalid as discussed in CXF-7011

https://issues.apache.org/jira/browse/CXF-7002
https://issues.apache.org/jira/browse/CXF-7011

instead

such block means the same;

https://issues.apache.org/jira/browse/CXF-7011?focusedCommentId=15422696&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15422696


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e822ae58
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e822ae58
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e822ae58

Branch: refs/heads/camel-2.17.x
Commit: e822ae58c43aec84cc4d1ff64df008ea22ddd910
Parents: 19f619e
Author: önder sezgin <ondersez...@gmail.com>
Authored: Tue Aug 16 22:22:46 2016 +0300
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Aug 17 09:52:05 2016 +0200

----------------------------------------------------------------------
 .../org/apache/camel/component/cxf/CxfConsumer.java   | 14 +++++++++++++-
 .../camel/component/cxf/jaxrs/CxfRsInvoker.java       | 14 +++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e822ae58/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index 11ec2f2..43561ba 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -26,6 +26,7 @@ import org.w3c.dom.Element;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Processor;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
 import org.apache.camel.impl.DefaultConsumer;
@@ -149,6 +150,8 @@ public class CxfConsumer extends DefaultConsumer {
 
                     // The continuation could be called before the suspend is 
called
                     continuation.suspend(cxfEndpoint.getContinuationTimeout());
+                    
+                    continuation.setObject(camelExchange);
 
                     // use the asynchronous API to process the exchange
                     getAsyncProcessor().process(camelExchange, new 
AsyncCallback() {
@@ -157,7 +160,6 @@ public class CxfConsumer extends DefaultConsumer {
                             synchronized (continuation) {
                                 LOG.trace("Resuming continuation of 
exchangeId: {}", camelExchange.getExchangeId());
                                 // resume processing after both, sync and 
async callbacks
-                                continuation.setObject(camelExchange);
                                 continuation.resume();
                             }
                         }
@@ -171,6 +173,16 @@ public class CxfConsumer extends DefaultConsumer {
                         CxfConsumer.this.doneUoW(camelExchange);
                     }
 
+                } else if (!continuation.isResumed() && 
!continuation.isPending()) {
+                    org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation.getObject();
+                    try {
+                        if (!continuation.isPending()) {
+                            camelExchange.setException(new 
ExchangeTimedOutException(camelExchange, cxfEndpoint.getContinuationTimeout()));
+                        }
+                        setResponseBack(cxfExchange, camelExchange);
+                    } finally {
+                        CxfConsumer.this.doneUoW(camelExchange);
+                    }
                 }
             }
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/e822ae58/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
index fb999f0..01563d3 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
@@ -26,6 +26,7 @@ import javax.ws.rs.core.UriInfo;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
@@ -90,13 +91,13 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 // The continuation could be called before the suspend is 
called
                 continuation.suspend(endpoint.getContinuationTimeout());
                 cxfExchange.put(SUSPENED, Boolean.TRUE);
+                continuation.setObject(camelExchange);
                 cxfRsConsumer.getAsyncProcessor().process(camelExchange, new 
AsyncCallback() {
                     public void done(boolean doneSync) {
                         // make sure the continuation resume will not be 
called before the suspend method in other thread
                         synchronized (continuation) {
                             LOG.trace("Resuming continuation of exchangeId: 
{}", camelExchange.getExchangeId());
                             // resume processing after both, sync and async 
callbacks
-                            continuation.setObject(camelExchange);
                             continuation.resume();
                         }
                     }
@@ -111,6 +112,17 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 } finally {
                     cxfRsConsumer.doneUoW(camelExchange);
                 }
+            } else {
+                if (!continuation.isPending()) {
+                    cxfExchange.put(SUSPENED, Boolean.FALSE);
+                    org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation.getObject();
+                    camelExchange.setException(new 
ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout()));
+                    try {
+                        return returnResponse(cxfExchange, camelExchange);
+                    } finally {
+                        cxfRsConsumer.doneUoW(camelExchange);
+                    }
+                }
             }
         }
         return null;

Reply via email to