Repository: camel
Updated Branches:
  refs/heads/master 35f172e69 -> e9091d3fc


CAMEL-8419 Camel StreamCache does not work with CXF consumer for InOut messages
* Closing UnitOfWork in Interceptor/MessageObserver


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e9091d3f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e9091d3f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e9091d3f

Branch: refs/heads/master
Commit: e9091d3fc687973de5d876cdd83fc72f31611fb9
Parents: 35f172e
Author: Sami Nurminen <snurm...@gmail.com>
Authored: Thu Jun 15 23:38:01 2017 +0300
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Jun 29 13:53:38 2017 +0200

----------------------------------------------------------------------
 .../apache/camel/component/cxf/CxfConsumer.java |  49 +++++++-
 .../component/cxf/jaxrs/CxfRsConsumer.java      |  49 +++++++-
 .../camel/component/cxf/jaxrs/CxfRsInvoker.java |  12 +-
 ...nsumerClientDisconnectedSynchronousTest.java |  28 +++++
 .../cxf/CxfConsumerClientDisconnectedTest.java  | 105 ++++++++++++++++
 .../CxfConsumerStreamCacheSynchronousTest.java  |  26 ++++
 .../cxf/CxfConsumerStreamCacheTest.java         | 118 ++++++++++++++++++
 ...nsumerClientDisconnectedSynchronousTest.java |  25 ++++
 .../CxfRsConsumerClientDisconnectedTest.java    | 108 +++++++++++++++++
 .../jaxrs/CxfRsStreamCacheSynchronousTest.java  |  25 ++++
 .../cxf/jaxrs/CxfRsStreamCacheTest.java         | 120 +++++++++++++++++++
 11 files changed, 657 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index 808d58b..0e5478a 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -21,7 +21,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import javax.xml.ws.WebFault;
-
 import org.w3c.dom.Element;
 
 import org.apache.camel.AsyncCallback;
@@ -31,6 +30,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
+
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.endpoint.Server;
@@ -39,8 +39,11 @@ import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.FaultMode;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
 import org.apache.cxf.service.invoker.Invoker;
 import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.ContextUtils;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.slf4j.Logger;
@@ -77,9 +80,42 @@ public class CxfConsumer extends DefaultConsumer {
         if (ObjectHelper.isNotEmpty(cxfEndpoint.getPublishedEndpointUrl())) {
             
server.getEndpoint().getEndpointInfo().setProperty("publishedEndpointUrl", 
cxfEndpoint.getPublishedEndpointUrl());
         }
+
+        final MessageObserver originalOutFaultObserver = 
server.getEndpoint().getOutFaultObserver();
+        server.getEndpoint().setOutFaultObserver(message -> {
+            Exchange cxfExchange = null;
+            if ((cxfExchange = message.getExchange()) != null) {
+                org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
+                if (exchange != null) {
+                    doneUoW(exchange);
+                }
+            }
+            originalOutFaultObserver.onMessage(message);
+        });
+
+        server.getEndpoint().getOutInterceptors().add(new 
UnitOfWorkCloserInterceptor());
+
         return server;
     }
 
+    //closes UnitOfWork in good case
+    private class UnitOfWorkCloserInterceptor extends 
AbstractPhaseInterceptor<Message> {
+        public UnitOfWorkCloserInterceptor() {
+            super(Phase.POST_LOGICAL_ENDING);
+        }
+
+        @Override
+        public void handleMessage(Message message) throws Fault {
+            Exchange cxfExchange = null;
+            if ((cxfExchange = message.getExchange()) != null) {
+                org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
+                if (exchange != null) {
+                    doneUoW(exchange);
+                }
+            }
+        }
+    }
+
     public Server getServer() {
         return server;
     }
@@ -179,8 +215,9 @@ public class CxfConsumer extends DefaultConsumer {
                     org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation.getObject();
                     try {
                         setResponseBack(cxfExchange, camelExchange);
-                    } finally {
+                    } catch (Exception ex) {
                         CxfConsumer.this.doneUoW(camelExchange);
+                        throw ex;
                     }
 
                 } else if (!continuation.isResumed() && 
!continuation.isPending()) {
@@ -190,8 +227,9 @@ public class CxfConsumer extends DefaultConsumer {
                             camelExchange.setException(new 
ExchangeTimedOutException(camelExchange, cxfEndpoint.getContinuationTimeout()));
                         }
                         setResponseBack(cxfExchange, camelExchange);
-                    } finally {
+                    } catch (Exception ex) {
                         CxfConsumer.this.doneUoW(camelExchange);
+                        throw ex;
                     }
                 }
             }
@@ -224,8 +262,9 @@ public class CxfConsumer extends DefaultConsumer {
 
                 LOG.trace("Processing +++ END +++");
                 setResponseBack(cxfExchange, camelExchange);
-            } finally {
+            }  catch (Exception ex) {
                 doneUoW(camelExchange);
+                throw ex;
             }
             // response should have been set in outMessage's content
             return null;
@@ -238,6 +277,8 @@ public class CxfConsumer extends DefaultConsumer {
 
             // create a Camel exchange, the default MEP is InOut
             org.apache.camel.Exchange camelExchange = 
endpoint.createExchange();
+            //needs access in MessageObserver/Interceptor to close the 
UnitOfWork
+            cxfExchange.put(org.apache.camel.Exchange.class, camelExchange);
 
             DataFormat dataFormat = endpoint.getDataFormat();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
index 34949ae..e97caa9 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
@@ -16,11 +16,18 @@
  */
 package org.apache.camel.component.cxf.jaxrs;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.OutFaultChainInitiatorObserver;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.transport.MessageObserver;
 
 /**
  * A Consumer of exchanges for a JAXRS service in CXF.  CxfRsConsumer acts a 
CXF
@@ -41,12 +48,52 @@ public class CxfRsConsumer extends DefaultConsumer {
         CxfRsInvoker cxfRsInvoker = new CxfRsInvoker(endpoint, this);
         JAXRSServerFactoryBean svrBean = 
endpoint.createJAXRSServerFactoryBean();
         Bus bus = endpoint.getBus();
+
         // We need to apply the bus setting from the CxfRsEndpoint which does 
not use the default bus
         if (bus != null) {
             svrBean.setBus(bus);
+
         }
+
         svrBean.setInvoker(cxfRsInvoker);
-        return svrBean.create();
+
+        svrBean.getOutInterceptors().add(new UnitOfWorkCloserInterceptor());
+
+
+        Server server = svrBean.create();
+
+        final MessageObserver originalOutFaultObserver = 
server.getEndpoint().getOutFaultObserver();
+        //proxy OutFaultObserver so we can close 
org.apache.camel.spi.UnitOfWork in case of error
+        server.getEndpoint().setOutFaultObserver(message -> {
+            org.apache.cxf.message.Exchange cxfExchange = null;
+            if ((cxfExchange = message.getExchange()) != null) {
+                org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
+                if (exchange != null) {
+                    doneUoW(exchange);
+                }
+            }
+            originalOutFaultObserver.onMessage(message);
+        });
+
+        return server;
+    }
+
+    //closes UnitOfWork in good case
+    private class UnitOfWorkCloserInterceptor extends 
AbstractPhaseInterceptor<Message> {
+        public UnitOfWorkCloserInterceptor() {
+            super(Phase.POST_LOGICAL_ENDING);
+        }
+
+        @Override
+        public void handleMessage(Message message) throws Fault {
+            org.apache.cxf.message.Exchange cxfExchange = null;
+            if ((cxfExchange = message.getExchange()) != null) {
+                org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
+                if (exchange != null) {
+                    doneUoW(exchange);
+                }
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
index 01563d3..29d9fa3 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
@@ -109,8 +109,9 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 org.apache.camel.Exchange camelExchange = 
(org.apache.camel.Exchange)continuation.getObject();
                 try {
                     return returnResponse(cxfExchange, camelExchange);
-                } finally {
+                } catch (Exception ex) {
                     cxfRsConsumer.doneUoW(camelExchange);
+                    throw ex;
                 }
             } else {
                 if (!continuation.isPending()) {
@@ -119,8 +120,9 @@ public class CxfRsInvoker extends JAXRSInvoker {
                     camelExchange.setException(new 
ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout()));
                     try {
                         return returnResponse(cxfExchange, camelExchange);
-                    } finally {
+                    } catch (Exception ex) {
                         cxfRsConsumer.doneUoW(camelExchange);
+                        throw ex;
                     }
                 }
             }
@@ -143,8 +145,9 @@ public class CxfRsInvoker extends JAXRSInvoker {
 
         try {
             return returnResponse(cxfExchange, camelExchange);
-        } finally {
+        } catch (Exception ex) {
             cxfRsConsumer.doneUoW(camelExchange);
+            throw  ex;
         }
     }
     
@@ -155,6 +158,9 @@ public class CxfRsInvoker extends JAXRSInvoker {
             ep = ExchangePattern.InOnly;
         } 
         final org.apache.camel.Exchange camelExchange = 
endpoint.createExchange(ep);
+        //needs access in MessageObserver/Interceptor to close the UnitOfWork
+        cxfExchange.put(org.apache.camel.Exchange.class, camelExchange);
+
         if (response != null) {
             camelExchange.getOut().setBody(response);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedSynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedSynchronousTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedSynchronousTest.java
new file mode 100644
index 0000000..c9707d9
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedSynchronousTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+
+
+
+public class CxfConsumerClientDisconnectedSynchronousTest extends 
CxfConsumerClientDisconnectedTest {
+
+    protected boolean isSynchronous() {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java
new file mode 100644
index 0000000..aba3ccc
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.commons.net.telnet.TelnetClient;
+import org.junit.Test;
+
+/**
+ * UnitOfWork should complete even if client disconnected during the 
processing.
+ */
+public class CxfConsumerClientDisconnectedTest extends CamelTestSupport {
+    private static final int PORT = CXFTestSupport.getPort1();
+    private static final String CONTEXT = "/CxfConsumerClientDisconnectedTest";
+    private static final String CXT = PORT + CONTEXT;
+
+    private String cxfRsEndpointUri = "cxf://http://localhost:"; + CXT + 
"/rest?synchronous=" + isSynchronous()
+                                      + 
"&serviceClass=org.apache.camel.component.cxf.ServiceProvider&dataFormat=PAYLOAD";
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+
+        return new RouteBuilder() {
+            public void configure() {
+
+                getContext().setStreamCaching(true);
+                getContext().getStreamCachingStrategy().setSpoolThreshold(1L);
+                errorHandler(noErrorHandler());
+
+                Response ok = Response.ok().build();
+
+                from(cxfRsEndpointUri)
+                    // should be able to convert to Customer
+                    .to("mock:result")
+                    .process(exchange-> {
+                        Thread.sleep(100);
+
+                        exchange.addOnCompletion(new Synchronization() {
+                            @Override
+                            public void onComplete(Exchange exchange) {
+                                template.sendBody("mock:onComplete", "");
+                            }
+
+                            @Override
+                            public void onFailure(Exchange exchange) {
+
+                            }
+                        });
+                    });
+
+            };
+        };
+    }
+
+    @Test
+    public void testClientDisconnect() throws Exception {
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        MockEndpoint onComplete = getMockEndpoint("mock:onComplete");
+        onComplete.expectedMessageCount(1);
+
+        TelnetClient telnetClient = new TelnetClient();
+
+        telnetClient.connect("localhost", PORT);
+        telnetClient.setTcpNoDelay(true);
+        telnetClient.setReceiveBufferSize(1);
+
+        BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(telnetClient.getOutputStream()));
+        writer.write("GET " + CONTEXT + "/rest/customerservice/customers 
HTTP/1.1\nhost: localhost\n\n");
+        writer.flush();
+        telnetClient.disconnect();
+        mock.assertIsSatisfied();
+        onComplete.assertIsSatisfied();
+
+    }
+
+    protected boolean isSynchronous() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheSynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheSynchronousTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheSynchronousTest.java
new file mode 100644
index 0000000..a50837c
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheSynchronousTest.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cxf;
+
+public class CxfConsumerStreamCacheSynchronousTest extends 
CxfConsumerStreamCacheTest {
+
+    @Override
+    protected boolean isSynchronous() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java
new file mode 100644
index 0000000..07e12fc
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cxf;
+
+import org.w3c.dom.Node;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+
+
+//Modified from 
https://issues.apache.org/jira/secure/attachment/12730161/0001-CAMEL-8419-Camel-StreamCache-does-not-work-with-CXF-.patch
+public class CxfConsumerStreamCacheTest extends CamelTestSupport {
+    
+    protected static final String REQUEST_MESSAGE = "<soapenv:Envelope 
xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\"; 
xmlns:ser=\"test/service\">"
+        + 
"<soapenv:Header/><soapenv:Body><ser:ping/></soapenv:Body></soapenv:Envelope>";
+    
+    protected static final String RESPONSE_MESSAGE_BEGINE = "<soap:Envelope 
xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\";>"
+        + "<soap:Body><pong xmlns=\"test/service\"";
+    protected static final String RESPONSE_MESSAGE_END = 
"/></soap:Body></soap:Envelope>";
+    
+    protected static final String RESPONSE = "<pong xmlns=\"test/service\"/>";
+
+    protected final String simpleEndpointAddress = "http://localhost:";
+        + CXFTestSupport.getPort1() + "/" + getClass().getSimpleName() + 
"/test";
+    protected final String simpleEndpointURI = "cxf://" + simpleEndpointAddress
+        + "?synchronous=" + isSynchronous() + 
"&serviceClass=org.apache.camel.component.cxf.ServiceProvider&dataFormat=PAYLOAD";
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                getContext().setStreamCaching(true);
+                getContext().getStreamCachingStrategy().setSpoolThreshold(1L);
+                errorHandler(noErrorHandler());
+                from(getFromEndpointUri()).process(new Processor() {
+                    public void process(final Exchange exchange) throws 
Exception {
+                        Message in = exchange.getIn();
+                        Node node = in.getBody(Node.class);
+                        assertNotNull(node);
+                        CachedOutputStream cos = new 
CachedOutputStream(exchange);
+                        cos.write(RESPONSE.getBytes("UTF-8"));
+                        cos.close();
+                        exchange.getOut().setBody(cos.newStreamCache());
+
+                        exchange.addOnCompletion(new Synchronization() {
+                            @Override
+                            public void onComplete(Exchange exchange) {
+                                template.sendBody("mock:onComplete", "");
+                            }
+
+                            @Override
+                            public void onFailure(Exchange exchange) {
+
+                            }
+                        });
+                    }
+                });
+            }
+        };
+    }
+
+    @Test
+    public void testInvokingServiceFromHttpCompnent() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:onComplete");
+        mock.expectedMessageCount(2);
+
+        // call the service with right post message
+        
+        String response = template.requestBody(simpleEndpointAddress, 
REQUEST_MESSAGE, String.class);
+        assertTrue("Get a wrong response ", 
response.startsWith(RESPONSE_MESSAGE_BEGINE));
+        assertTrue("Get a wrong response ", 
response.endsWith(RESPONSE_MESSAGE_END));
+        try {
+            template.requestBody(simpleEndpointAddress, null, String.class);
+            fail("Excpetion to get exception here");
+        } catch (Exception ex) {
+            // do nothing here
+        }
+       
+        response = template.requestBody(simpleEndpointAddress, 
REQUEST_MESSAGE, String.class);
+        assertTrue("Get a wrong response ", 
response.startsWith(RESPONSE_MESSAGE_BEGINE));
+        assertTrue("Get a wrong response ", 
response.endsWith(RESPONSE_MESSAGE_END));
+        mock.assertIsSatisfied();
+    }
+    
+    protected String getFromEndpointUri() {
+        return simpleEndpointURI;
+    }
+
+    protected boolean isSynchronous() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedSynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedSynchronousTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedSynchronousTest.java
new file mode 100644
index 0000000..8c58f1d
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedSynchronousTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+public class CxfRsConsumerClientDisconnectedSynchronousTest extends 
CxfRsConsumerClientDisconnectedTest {
+
+    protected boolean isSynchronous() {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java
new file mode 100644
index 0000000..4a4a47c
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.CXFTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.commons.net.telnet.TelnetClient;
+import org.junit.Test;
+
+/**
+ * UnitOfWork should complete even if client disconnected during the 
processing.
+ */
+public class CxfRsConsumerClientDisconnectedTest extends CamelTestSupport {
+    private static final int PORT = CXFTestSupport.getPort1();
+    private static final String CONTEXT = 
"/CxfRsConsumerClientDisconnectedTest";
+    private static final String CXT = PORT + CONTEXT;
+
+    private String cxfRsEndpointUri = "cxfrs://http://localhost:"; + CXT + 
"/rest?synchronous=" + isSynchronous()
+                                      + 
"&dataFormat=PAYLOAD&resourceClasses=org.apache.camel.component.cxf.jaxrs.testbean.CustomerService";
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+
+        return new RouteBuilder() {
+            public void configure() {
+
+                getContext().setStreamCaching(true);
+                getContext().getStreamCachingStrategy().setSpoolThreshold(1L);
+                errorHandler(noErrorHandler());
+
+                Response ok = Response.ok().build();
+
+                from(cxfRsEndpointUri)
+                    // should be able to convert to Customer
+                    .to("mock:result")
+                    .process(exchange-> {
+                        Thread.sleep(100);
+
+                        exchange.addOnCompletion(new Synchronization() {
+                            @Override
+                            public void onComplete(Exchange exchange) {
+                                template.sendBody("mock:onComplete", "");
+                            }
+
+                            @Override
+                            public void onFailure(Exchange exchange) {
+
+                            }
+                        });
+                    });
+
+            };
+        };
+    }
+
+    @Test
+    public void testClientDisconnect() throws Exception {
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        MockEndpoint onComplete = getMockEndpoint("mock:onComplete");
+        onComplete.expectedMessageCount(1);
+
+        TelnetClient telnetClient = new TelnetClient();
+
+        telnetClient.connect("localhost", PORT);
+        telnetClient.setTcpNoDelay(true);
+        telnetClient.setReceiveBufferSize(1);
+
+        BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(telnetClient.getOutputStream()));
+        writer.write("GET " + CONTEXT + "/rest/customerservice/customers 
HTTP/1.1\nhost: localhost\n\n");
+        writer.flush();
+        telnetClient.disconnect();
+        mock.assertIsSatisfied();
+        onComplete.assertIsSatisfied();
+
+
+
+    }
+
+    protected boolean isSynchronous() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheSynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheSynchronousTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheSynchronousTest.java
new file mode 100644
index 0000000..99a2925
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheSynchronousTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+public class CxfRsStreamCacheSynchronousTest extends CxfRsStreamCacheTest {
+
+    @Override
+    protected boolean isSynchronous() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java
new file mode 100644
index 0000000..7a97b40
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.CXFTestSupport;
+import org.apache.camel.component.cxf.jaxrs.testbean.Customer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.Test;
+
+
+public class CxfRsStreamCacheTest extends CamelTestSupport {
+    private static final String PUT_REQUEST = 
"<Customer><name>Mary</name><id>123</id></Customer>";
+    private static final String CONTEXT = "/CxfRsStreamCacheTest";
+    private static final String CXT = CXFTestSupport.getPort1() + CONTEXT;
+    private static final String RESPONSE = "<pong xmlns=\"test/service\"/>";
+
+    private String cxfRsEndpointUri = "cxfrs://http://localhost:"; + CXT + 
"/rest?synchronous=" + isSynchronous()
+                                      + 
"&dataFormat=PAYLOAD&resourceClasses=org.apache.camel.component.cxf.jaxrs.testbean.CustomerService";
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+
+        return new RouteBuilder() {
+            public void configure() {
+
+                getContext().setStreamCaching(true);
+                getContext().getStreamCachingStrategy().setSpoolThreshold(1L);
+                errorHandler(noErrorHandler());
+
+                Response ok = Response.ok().build();
+
+                from(cxfRsEndpointUri)
+                    // should be able to convert to Customer
+                    .convertBodyTo(Customer.class)
+                    .to("mock:result")
+                    .process(exchange-> {
+                        // respond with OK
+                        CachedOutputStream cos = new 
CachedOutputStream(exchange);
+                        cos.write(RESPONSE.getBytes("UTF-8"));
+                        cos.close();
+                        exchange.getOut().setBody(cos.newStreamCache());
+
+                        exchange.addOnCompletion(new Synchronization() {
+                            @Override
+                            public void onComplete(Exchange exchange) {
+                                template.sendBody("mock:onComplete", "");
+                            }
+
+                            @Override
+                            public void onFailure(Exchange exchange) {
+
+                            }
+                        });
+                    });
+
+            };
+        };
+    }
+
+    @Test
+    public void testPutConsumer() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.message(0).body().isInstanceOf(Customer.class);
+
+        MockEndpoint onComplete = getMockEndpoint("mock:onComplete");
+        onComplete.expectedMessageCount(1);
+
+
+        HttpPut put = new HttpPut("http://localhost:"; + CXT + 
"/rest/customerservice/customers");
+        StringEntity entity = new StringEntity(PUT_REQUEST, "ISO-8859-1");
+        entity.setContentType("text/xml; charset=ISO-8859-1");
+        put.addHeader("test", "header1;header2");
+        put.setEntity(entity);
+        CloseableHttpClient httpclient = HttpClientBuilder.create().build();
+
+        try {
+            HttpResponse response = httpclient.execute(put);
+            assertEquals(200, response.getStatusLine().getStatusCode());
+            assertEquals(RESPONSE, EntityUtils.toString(response.getEntity()));
+        } finally {
+            httpclient.close();
+        }
+
+        mock.assertIsSatisfied();
+        onComplete.assertIsSatisfied();
+
+    }
+
+    protected boolean isSynchronous() {
+        return false;
+    }
+
+}

Reply via email to