Updated Branches: refs/heads/master 81d0a174b -> 24e01f171
CAMEL-6403: DefaultConsumer allows to handle the lifecycle of Exchange UoW, which is needed for HTTP/TCP consumers which can send back very large streaming based replies. As the UoW work should be done after the reply has been sent. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/24e01f17 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/24e01f17 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/24e01f17 Branch: refs/heads/master Commit: 24e01f17149ea3690d1fe4d2fabe4c78a5d88225 Parents: 81d0a17 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Aug 12 19:45:11 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Aug 12 19:45:11 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/cxf/CxfConsumer.java | 43 ++++++++++++++------ .../camel/component/cxf/jaxrs/CxfRsInvoker.java | 24 ++++++++--- .../cxf/AbstractCXFGreeterRouterTest.java | 1 + 3 files changed, 49 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/24e01f17/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java index a8c29f8..cc8145e 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java @@ -82,7 +82,7 @@ public class CxfConsumer extends DefaultConsumer { private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) { synchronized (continuation) { if (continuation.isNew()) { - final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange); + final org.apache.camel.Exchange camelExchange = prepareCamelExchange(cxfExchange); // Now we don't set up the timeout value LOG.trace("Suspending continuation of exchangeId: {}", camelExchange.getExchangeId()); @@ -104,9 +104,12 @@ public class CxfConsumer extends DefaultConsumer { }); } else if (continuation.isResumed()) { - org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation - .getObject(); - setResponseBack(cxfExchange, camelExchange); + org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); + try { + setResponseBack(cxfExchange, camelExchange); + } finally { + CxfConsumer.this.doneUoW(camelExchange); + } } } @@ -127,27 +130,41 @@ public class CxfConsumer extends DefaultConsumer { } private Object syncInvoke(Exchange cxfExchange) { - org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange); - // send Camel exchange to the target processor - LOG.trace("Processing +++ START +++"); + org.apache.camel.Exchange camelExchange = prepareCamelExchange(cxfExchange); try { - getProcessor().process(camelExchange); - } catch (Exception e) { - throw new Fault(e); + try { + LOG.trace("Processing +++ START +++"); + // send Camel exchange to the target processor + getProcessor().process(camelExchange); + } catch (Exception e) { + throw new Fault(e); + } + + LOG.trace("Processing +++ END +++"); + setResponseBack(cxfExchange, camelExchange); + } finally { + doneUoW(camelExchange); } - LOG.trace("Processing +++ END +++"); - setResponseBack(cxfExchange, camelExchange); + // response should have been set in outMessage's content return null; } - private org.apache.camel.Exchange perpareCamelExchange(Exchange cxfExchange) { + private org.apache.camel.Exchange prepareCamelExchange(Exchange cxfExchange) { // get CXF binding CxfEndpoint endpoint = (CxfEndpoint)getEndpoint(); CxfBinding binding = endpoint.getCxfBinding(); // create a Camel exchange, the default MEP is InOut org.apache.camel.Exchange camelExchange = endpoint.createExchange(); + // we want to handle the UoW + try { + CxfConsumer.this.createUoW(camelExchange); + } catch (Exception e) { + log.error("Error processing request", e); + throw new Fault(e); + } + DataFormat dataFormat = endpoint.getDataFormat(); BindingOperationInfo boi = cxfExchange.getBindingOperationInfo(); http://git-wip-us.apache.org/repos/asf/camel/blob/24e01f17/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java index f9a8d1c..38bf420 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java @@ -76,6 +76,9 @@ public class CxfRsInvoker extends JAXRSInvoker { ep = ExchangePattern.InOnly; } final org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep); + // we want to handle the UoW + cxfRsConsumer.createUoW(camelExchange); + CxfRsBinding binding = endpoint.getBinding(); binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, method, paramArray); // Now we don't set up the timeout value @@ -99,9 +102,12 @@ public class CxfRsInvoker extends JAXRSInvoker { } if (continuation.isResumed()) { cxfExchange.put(SUSPENED, Boolean.FALSE); - org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation - .getObject(); - return returnResponse(cxfExchange, camelExchange); + org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); + try { + return returnResponse(cxfExchange, camelExchange); + } finally { + cxfRsConsumer.doneUoW(camelExchange); + } } } return null; @@ -115,16 +121,22 @@ public class CxfRsInvoker extends JAXRSInvoker { ep = ExchangePattern.InOnly; } org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep); + // we want to handle the UoW + cxfRsConsumer.createUoW(camelExchange); CxfRsBinding binding = endpoint.getBinding(); binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, method, paramArray); - + try { cxfRsConsumer.getProcessor().process(camelExchange); } catch (Exception exception) { camelExchange.setException(exception); } - return returnResponse(cxfExchange, camelExchange); - + + try { + return returnResponse(cxfExchange, camelExchange); + } finally { + cxfRsConsumer.doneUoW(camelExchange); + } } private Object returnResponse(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/24e01f17/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java index 7b3b92c..223ef21 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.cxf; import javax.xml.namespace.QName; import javax.xml.ws.Service; +import javax.xml.ws.soap.SOAPFaultException; import org.apache.camel.CamelContext; import org.apache.camel.RuntimeCamelException;