Author: ningjiang Date: Tue Sep 28 10:13:57 2010 New Revision: 1002095 URL: http://svn.apache.org/viewvc?rev=1002095&view=rev Log: CAMEL-3169 Add continuation synchronize block to avoid continuation is resumed befor it is suspended by the other thread
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?rev=1002095&r1=1002094&r2=1002095&view=diff ============================================================================== --- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original) +++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Tue Sep 28 10:13:57 2010 @@ -71,47 +71,54 @@ public class CxfConsumer extends Default } private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) { - if (continuation.isNew()) { - final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange); - - // use the asynchronous API to process the exchange - boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() { - public void done(boolean doneSync) { + synchronized (continuation) { + if (continuation.isNew()) { + final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange); + + // use the asynchronous API to process the exchange + boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() { + public void done(boolean doneSync) { + // make sure the continuation resume will not be called before the suspend method in other thread + synchronized (continuation) { + if (LOG.isTraceEnabled()) { + LOG.trace("Resuming continuation of exchangeId: " + + camelExchange.getExchangeId()); + } + // resume processing after both, sync and async callbacks + continuation.setObject(camelExchange); + continuation.resume(); + } + } + }); + // just need to avoid the continuation.resume is called + // before the continuation.suspend is called + if (continuation.getObject() != camelExchange && !sync) { + // Now we don't set up the timeout value if (LOG.isTraceEnabled()) { - LOG.trace("Resuming continuation of exchangeId: " + LOG.trace("Suspending continuation of exchangeId: " + camelExchange.getExchangeId()); } - // resume processing after both, sync and async callbacks - continuation.setObject(camelExchange); - continuation.resume(); - } - }); - // just need to avoid the continuation.resume is called - // before the continuation.suspend is called - if (continuation.getObject() != camelExchange && !sync) { - // Now we don't set up the timeout value - if (LOG.isTraceEnabled()) { - LOG.trace("Suspending continuation of exchangeId: " - + camelExchange.getExchangeId()); - } - // The continuation could be called before the suspend - // is called - continuation.suspend(0); - } else { - // just set the response back, as the invoking thread is - // not changed - if (LOG.isTraceEnabled()) { - LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId()); + // The continuation could be called before the + // suspend + // is called + continuation.suspend(0); + } else { + // just set the response back, as the invoking + // thread is + // not changed + if (LOG.isTraceEnabled()) { + LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId()); + } + setResponseBack(cxfExchange, camelExchange); } - setResponseBack(cxfExchange, camelExchange); - } - } - if (continuation.isResumed()) { - org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation - .getObject(); - setResponseBack(cxfExchange, camelExchange); + } + if (continuation.isResumed()) { + org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation + .getObject(); + setResponseBack(cxfExchange, camelExchange); + } } return null; }