CAMEL-10171 memory leak when continuation expires setobject done earlier and exception set on camelExchange. isExpired method call thru Continuation interface is cancelled and below issues became invalid as discussed in CXF-7011
https://issues.apache.org/jira/browse/CXF-7002 https://issues.apache.org/jira/browse/CXF-7011 instead such block means the same; https://issues.apache.org/jira/browse/CXF-7011?focusedCommentId=15422696&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15422696 Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e822ae58 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e822ae58 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e822ae58 Branch: refs/heads/camel-2.17.x Commit: e822ae58c43aec84cc4d1ff64df008ea22ddd910 Parents: 19f619e Author: önder sezgin <ondersez...@gmail.com> Authored: Tue Aug 16 22:22:46 2016 +0300 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 17 09:52:05 2016 +0200 ---------------------------------------------------------------------- .../org/apache/camel/component/cxf/CxfConsumer.java | 14 +++++++++++++- .../camel/component/cxf/jaxrs/CxfRsInvoker.java | 14 +++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e822ae58/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java index 11ec2f2..43561ba 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java @@ -26,6 +26,7 @@ import org.w3c.dom.Element; import org.apache.camel.AsyncCallback; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.Processor; import org.apache.camel.component.cxf.common.message.CxfConstants; import org.apache.camel.impl.DefaultConsumer; @@ -149,6 +150,8 @@ public class CxfConsumer extends DefaultConsumer { // The continuation could be called before the suspend is called continuation.suspend(cxfEndpoint.getContinuationTimeout()); + + continuation.setObject(camelExchange); // use the asynchronous API to process the exchange getAsyncProcessor().process(camelExchange, new AsyncCallback() { @@ -157,7 +160,6 @@ public class CxfConsumer extends DefaultConsumer { synchronized (continuation) { LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId()); // resume processing after both, sync and async callbacks - continuation.setObject(camelExchange); continuation.resume(); } } @@ -171,6 +173,16 @@ public class CxfConsumer extends DefaultConsumer { CxfConsumer.this.doneUoW(camelExchange); } + } else if (!continuation.isResumed() && !continuation.isPending()) { + org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); + try { + if (!continuation.isPending()) { + camelExchange.setException(new ExchangeTimedOutException(camelExchange, cxfEndpoint.getContinuationTimeout())); + } + setResponseBack(cxfExchange, camelExchange); + } finally { + CxfConsumer.this.doneUoW(camelExchange); + } } } return null; http://git-wip-us.apache.org/repos/asf/camel/blob/e822ae58/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java index fb999f0..01563d3 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java @@ -26,6 +26,7 @@ import javax.ws.rs.core.UriInfo; import org.apache.camel.AsyncCallback; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.RuntimeCamelException; import org.apache.cxf.continuations.Continuation; import org.apache.cxf.continuations.ContinuationProvider; @@ -90,13 +91,13 @@ public class CxfRsInvoker extends JAXRSInvoker { // The continuation could be called before the suspend is called continuation.suspend(endpoint.getContinuationTimeout()); cxfExchange.put(SUSPENED, Boolean.TRUE); + continuation.setObject(camelExchange); cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback() { public void done(boolean doneSync) { // make sure the continuation resume will not be called before the suspend method in other thread synchronized (continuation) { LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId()); // resume processing after both, sync and async callbacks - continuation.setObject(camelExchange); continuation.resume(); } } @@ -111,6 +112,17 @@ public class CxfRsInvoker extends JAXRSInvoker { } finally { cxfRsConsumer.doneUoW(camelExchange); } + } else { + if (!continuation.isPending()) { + cxfExchange.put(SUSPENED, Boolean.FALSE); + org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); + camelExchange.setException(new ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout())); + try { + return returnResponse(cxfExchange, camelExchange); + } finally { + cxfRsConsumer.doneUoW(camelExchange); + } + } } } return null;