Updated Branches:
  refs/heads/master 81d0a174b -> 24e01f171

CAMEL-6403: DefaultConsumer allows to handle the lifecycle of Exchange UoW, 
which is needed for HTTP/TCP consumers which can send back very large streaming 
based replies. As the UoW work should be done after the reply has been sent.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/24e01f17
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/24e01f17
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/24e01f17

Branch: refs/heads/master
Commit: 24e01f17149ea3690d1fe4d2fabe4c78a5d88225
Parents: 81d0a17
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Aug 12 19:45:11 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Aug 12 19:45:11 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/cxf/CxfConsumer.java | 43 ++++++++++++++------
 .../camel/component/cxf/jaxrs/CxfRsInvoker.java | 24 ++++++++---
 .../cxf/AbstractCXFGreeterRouterTest.java       |  1 +
 3 files changed, 49 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/24e01f17/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index a8c29f8..cc8145e 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -82,7 +82,7 @@ public class CxfConsumer extends DefaultConsumer {
             private Object asyncInvoke(Exchange cxfExchange, final 
Continuation continuation) {
                 synchronized (continuation) {
                     if (continuation.isNew()) {
-                        final org.apache.camel.Exchange camelExchange = 
perpareCamelExchange(cxfExchange);
+                        final org.apache.camel.Exchange camelExchange = 
prepareCamelExchange(cxfExchange);
                         
                         // Now we don't set up the timeout value
                         LOG.trace("Suspending continuation of exchangeId: {}", 
camelExchange.getExchangeId());
@@ -104,9 +104,12 @@ public class CxfConsumer extends DefaultConsumer {
                         });
                         
                     } else if (continuation.isResumed()) {
-                        org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation
-                            .getObject();
-                        setResponseBack(cxfExchange, camelExchange);
+                        org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation.getObject();
+                        try {
+                            setResponseBack(cxfExchange, camelExchange);
+                        } finally {
+                            CxfConsumer.this.doneUoW(camelExchange);
+                        }
 
                     }
                 }
@@ -127,27 +130,41 @@ public class CxfConsumer extends DefaultConsumer {
             }
             
             private Object syncInvoke(Exchange cxfExchange) {
-                org.apache.camel.Exchange camelExchange = 
perpareCamelExchange(cxfExchange);               
-                // send Camel exchange to the target processor
-                LOG.trace("Processing +++ START +++");
+                org.apache.camel.Exchange camelExchange = 
prepareCamelExchange(cxfExchange);
                 try {
-                    getProcessor().process(camelExchange);
-                } catch (Exception e) {
-                    throw new Fault(e);
+                    try {
+                        LOG.trace("Processing +++ START +++");
+                        // send Camel exchange to the target processor
+                        getProcessor().process(camelExchange);
+                    } catch (Exception e) {
+                        throw new Fault(e);
+                    }
+
+                    LOG.trace("Processing +++ END +++");
+                    setResponseBack(cxfExchange, camelExchange);
+                } finally {
+                    doneUoW(camelExchange);
                 }
-                LOG.trace("Processing +++ END +++");
-                setResponseBack(cxfExchange, camelExchange);
+
                 // response should have been set in outMessage's content
                 return null;
             }
             
-            private org.apache.camel.Exchange perpareCamelExchange(Exchange 
cxfExchange) {
+            private org.apache.camel.Exchange prepareCamelExchange(Exchange 
cxfExchange) {
                 // get CXF binding
                 CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
                 CxfBinding binding = endpoint.getCxfBinding();
 
                 // create a Camel exchange, the default MEP is InOut
                 org.apache.camel.Exchange camelExchange = 
endpoint.createExchange();
+                // we want to handle the UoW
+                try {
+                    CxfConsumer.this.createUoW(camelExchange);
+                } catch (Exception e) {
+                    log.error("Error processing request", e);
+                    throw new Fault(e);
+                }
+
                 DataFormat dataFormat = endpoint.getDataFormat();
 
                 BindingOperationInfo boi = 
cxfExchange.getBindingOperationInfo();

http://git-wip-us.apache.org/repos/asf/camel/blob/24e01f17/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
index f9a8d1c..38bf420 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
@@ -76,6 +76,9 @@ public class CxfRsInvoker extends JAXRSInvoker {
                     ep = ExchangePattern.InOnly;
                 } 
                 final org.apache.camel.Exchange camelExchange = 
endpoint.createExchange(ep);
+                // we want to handle the UoW
+                cxfRsConsumer.createUoW(camelExchange);
+
                 CxfRsBinding binding = endpoint.getBinding();
                 binding.populateExchangeFromCxfRsRequest(cxfExchange, 
camelExchange, method, paramArray);
                 // Now we don't set up the timeout value
@@ -99,9 +102,12 @@ public class CxfRsInvoker extends JAXRSInvoker {
             }
             if (continuation.isResumed()) {
                 cxfExchange.put(SUSPENED, Boolean.FALSE);
-                org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation
-                    .getObject();
-                return returnResponse(cxfExchange, camelExchange);
+                org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation.getObject();
+                try {
+                    return returnResponse(cxfExchange, camelExchange);
+                } finally {
+                    cxfRsConsumer.doneUoW(camelExchange);
+                }
             }
         }
         return null;
@@ -115,16 +121,22 @@ public class CxfRsInvoker extends JAXRSInvoker {
             ep = ExchangePattern.InOnly;
         } 
         org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep);
+        // we want to handle the UoW
+        cxfRsConsumer.createUoW(camelExchange);
         CxfRsBinding binding = endpoint.getBinding();
         binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, 
method, paramArray);
-        
+
         try {
             cxfRsConsumer.getProcessor().process(camelExchange);
         } catch (Exception exception) {
             camelExchange.setException(exception);
         }
-        return returnResponse(cxfExchange, camelExchange);
-        
+
+        try {
+            return returnResponse(cxfExchange, camelExchange);
+        } finally {
+            cxfRsConsumer.doneUoW(camelExchange);
+        }
     }
     
     private Object returnResponse(Exchange cxfExchange, 
org.apache.camel.Exchange camelExchange) throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/24e01f17/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java
index 7b3b92c..223ef21 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/AbstractCXFGreeterRouterTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.cxf;
 
 import javax.xml.namespace.QName;
 import javax.xml.ws.Service;
+import javax.xml.ws.soap.SOAPFaultException;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.RuntimeCamelException;

Reply via email to