Repository: camel Updated Branches: refs/heads/master 35f172e69 -> e9091d3fc
CAMEL-8419 Camel StreamCache does not work with CXF consumer for InOut messages * Closing UnitOfWork in Interceptor/MessageObserver Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e9091d3f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e9091d3f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e9091d3f Branch: refs/heads/master Commit: e9091d3fc687973de5d876cdd83fc72f31611fb9 Parents: 35f172e Author: Sami Nurminen <snurm...@gmail.com> Authored: Thu Jun 15 23:38:01 2017 +0300 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jun 29 13:53:38 2017 +0200 ---------------------------------------------------------------------- .../apache/camel/component/cxf/CxfConsumer.java | 49 +++++++- .../component/cxf/jaxrs/CxfRsConsumer.java | 49 +++++++- .../camel/component/cxf/jaxrs/CxfRsInvoker.java | 12 +- ...nsumerClientDisconnectedSynchronousTest.java | 28 +++++ .../cxf/CxfConsumerClientDisconnectedTest.java | 105 ++++++++++++++++ .../CxfConsumerStreamCacheSynchronousTest.java | 26 ++++ .../cxf/CxfConsumerStreamCacheTest.java | 118 ++++++++++++++++++ ...nsumerClientDisconnectedSynchronousTest.java | 25 ++++ .../CxfRsConsumerClientDisconnectedTest.java | 108 +++++++++++++++++ .../jaxrs/CxfRsStreamCacheSynchronousTest.java | 25 ++++ .../cxf/jaxrs/CxfRsStreamCacheTest.java | 120 +++++++++++++++++++ 11 files changed, 657 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java index 808d58b..0e5478a 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.Map; import javax.xml.ws.WebFault; - import org.w3c.dom.Element; import org.apache.camel.AsyncCallback; @@ -31,6 +30,7 @@ import org.apache.camel.Processor; import org.apache.camel.component.cxf.common.message.CxfConstants; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.util.ObjectHelper; + import org.apache.cxf.continuations.Continuation; import org.apache.cxf.continuations.ContinuationProvider; import org.apache.cxf.endpoint.Server; @@ -39,8 +39,11 @@ import org.apache.cxf.interceptor.Fault; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.FaultMode; import org.apache.cxf.message.Message; +import org.apache.cxf.phase.AbstractPhaseInterceptor; +import org.apache.cxf.phase.Phase; import org.apache.cxf.service.invoker.Invoker; import org.apache.cxf.service.model.BindingOperationInfo; +import org.apache.cxf.transport.MessageObserver; import org.apache.cxf.ws.addressing.ContextUtils; import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.slf4j.Logger; @@ -77,9 +80,42 @@ public class CxfConsumer extends DefaultConsumer { if (ObjectHelper.isNotEmpty(cxfEndpoint.getPublishedEndpointUrl())) { server.getEndpoint().getEndpointInfo().setProperty("publishedEndpointUrl", cxfEndpoint.getPublishedEndpointUrl()); } + + final MessageObserver originalOutFaultObserver = server.getEndpoint().getOutFaultObserver(); + server.getEndpoint().setOutFaultObserver(message -> { + Exchange cxfExchange = null; + if ((cxfExchange = message.getExchange()) != null) { + org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); + if (exchange != null) { + doneUoW(exchange); + } + } + originalOutFaultObserver.onMessage(message); + }); + + server.getEndpoint().getOutInterceptors().add(new UnitOfWorkCloserInterceptor()); + return server; } + //closes UnitOfWork in good case + private class UnitOfWorkCloserInterceptor extends AbstractPhaseInterceptor<Message> { + public UnitOfWorkCloserInterceptor() { + super(Phase.POST_LOGICAL_ENDING); + } + + @Override + public void handleMessage(Message message) throws Fault { + Exchange cxfExchange = null; + if ((cxfExchange = message.getExchange()) != null) { + org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); + if (exchange != null) { + doneUoW(exchange); + } + } + } + } + public Server getServer() { return server; } @@ -179,8 +215,9 @@ public class CxfConsumer extends DefaultConsumer { org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); try { setResponseBack(cxfExchange, camelExchange); - } finally { + } catch (Exception ex) { CxfConsumer.this.doneUoW(camelExchange); + throw ex; } } else if (!continuation.isResumed() && !continuation.isPending()) { @@ -190,8 +227,9 @@ public class CxfConsumer extends DefaultConsumer { camelExchange.setException(new ExchangeTimedOutException(camelExchange, cxfEndpoint.getContinuationTimeout())); } setResponseBack(cxfExchange, camelExchange); - } finally { + } catch (Exception ex) { CxfConsumer.this.doneUoW(camelExchange); + throw ex; } } } @@ -224,8 +262,9 @@ public class CxfConsumer extends DefaultConsumer { LOG.trace("Processing +++ END +++"); setResponseBack(cxfExchange, camelExchange); - } finally { + } catch (Exception ex) { doneUoW(camelExchange); + throw ex; } // response should have been set in outMessage's content return null; @@ -238,6 +277,8 @@ public class CxfConsumer extends DefaultConsumer { // create a Camel exchange, the default MEP is InOut org.apache.camel.Exchange camelExchange = endpoint.createExchange(); + //needs access in MessageObserver/Interceptor to close the UnitOfWork + cxfExchange.put(org.apache.camel.Exchange.class, camelExchange); DataFormat dataFormat = endpoint.getDataFormat(); http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java index 34949ae..e97caa9 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java @@ -16,11 +16,18 @@ */ package org.apache.camel.component.cxf.jaxrs; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.apache.cxf.Bus; import org.apache.cxf.endpoint.Server; +import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.interceptor.OutFaultChainInitiatorObserver; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.message.Message; +import org.apache.cxf.phase.AbstractPhaseInterceptor; +import org.apache.cxf.phase.Phase; +import org.apache.cxf.transport.MessageObserver; /** * A Consumer of exchanges for a JAXRS service in CXF. CxfRsConsumer acts a CXF @@ -41,12 +48,52 @@ public class CxfRsConsumer extends DefaultConsumer { CxfRsInvoker cxfRsInvoker = new CxfRsInvoker(endpoint, this); JAXRSServerFactoryBean svrBean = endpoint.createJAXRSServerFactoryBean(); Bus bus = endpoint.getBus(); + // We need to apply the bus setting from the CxfRsEndpoint which does not use the default bus if (bus != null) { svrBean.setBus(bus); + } + svrBean.setInvoker(cxfRsInvoker); - return svrBean.create(); + + svrBean.getOutInterceptors().add(new UnitOfWorkCloserInterceptor()); + + + Server server = svrBean.create(); + + final MessageObserver originalOutFaultObserver = server.getEndpoint().getOutFaultObserver(); + //proxy OutFaultObserver so we can close org.apache.camel.spi.UnitOfWork in case of error + server.getEndpoint().setOutFaultObserver(message -> { + org.apache.cxf.message.Exchange cxfExchange = null; + if ((cxfExchange = message.getExchange()) != null) { + org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); + if (exchange != null) { + doneUoW(exchange); + } + } + originalOutFaultObserver.onMessage(message); + }); + + return server; + } + + //closes UnitOfWork in good case + private class UnitOfWorkCloserInterceptor extends AbstractPhaseInterceptor<Message> { + public UnitOfWorkCloserInterceptor() { + super(Phase.POST_LOGICAL_ENDING); + } + + @Override + public void handleMessage(Message message) throws Fault { + org.apache.cxf.message.Exchange cxfExchange = null; + if ((cxfExchange = message.getExchange()) != null) { + org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); + if (exchange != null) { + doneUoW(exchange); + } + } + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java index 01563d3..29d9fa3 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java @@ -109,8 +109,9 @@ public class CxfRsInvoker extends JAXRSInvoker { org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); try { return returnResponse(cxfExchange, camelExchange); - } finally { + } catch (Exception ex) { cxfRsConsumer.doneUoW(camelExchange); + throw ex; } } else { if (!continuation.isPending()) { @@ -119,8 +120,9 @@ public class CxfRsInvoker extends JAXRSInvoker { camelExchange.setException(new ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout())); try { return returnResponse(cxfExchange, camelExchange); - } finally { + } catch (Exception ex) { cxfRsConsumer.doneUoW(camelExchange); + throw ex; } } } @@ -143,8 +145,9 @@ public class CxfRsInvoker extends JAXRSInvoker { try { return returnResponse(cxfExchange, camelExchange); - } finally { + } catch (Exception ex) { cxfRsConsumer.doneUoW(camelExchange); + throw ex; } } @@ -155,6 +158,9 @@ public class CxfRsInvoker extends JAXRSInvoker { ep = ExchangePattern.InOnly; } final org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep); + //needs access in MessageObserver/Interceptor to close the UnitOfWork + cxfExchange.put(org.apache.camel.Exchange.class, camelExchange); + if (response != null) { camelExchange.getOut().setBody(response); } http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedSynchronousTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedSynchronousTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedSynchronousTest.java new file mode 100644 index 0000000..c9707d9 --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedSynchronousTest.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf; + + + + +public class CxfConsumerClientDisconnectedSynchronousTest extends CxfConsumerClientDisconnectedTest { + + protected boolean isSynchronous() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java new file mode 100644 index 0000000..aba3ccc --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; + +import javax.ws.rs.core.Response; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.commons.net.telnet.TelnetClient; +import org.junit.Test; + +/** + * UnitOfWork should complete even if client disconnected during the processing. + */ +public class CxfConsumerClientDisconnectedTest extends CamelTestSupport { + private static final int PORT = CXFTestSupport.getPort1(); + private static final String CONTEXT = "/CxfConsumerClientDisconnectedTest"; + private static final String CXT = PORT + CONTEXT; + + private String cxfRsEndpointUri = "cxf://http://localhost:" + CXT + "/rest?synchronous=" + isSynchronous() + + "&serviceClass=org.apache.camel.component.cxf.ServiceProvider&dataFormat=PAYLOAD"; + + protected RouteBuilder createRouteBuilder() throws Exception { + + return new RouteBuilder() { + public void configure() { + + getContext().setStreamCaching(true); + getContext().getStreamCachingStrategy().setSpoolThreshold(1L); + errorHandler(noErrorHandler()); + + Response ok = Response.ok().build(); + + from(cxfRsEndpointUri) + // should be able to convert to Customer + .to("mock:result") + .process(exchange-> { + Thread.sleep(100); + + exchange.addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + template.sendBody("mock:onComplete", ""); + } + + @Override + public void onFailure(Exchange exchange) { + + } + }); + }); + + }; + }; + } + + @Test + public void testClientDisconnect() throws Exception { + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + MockEndpoint onComplete = getMockEndpoint("mock:onComplete"); + onComplete.expectedMessageCount(1); + + TelnetClient telnetClient = new TelnetClient(); + + telnetClient.connect("localhost", PORT); + telnetClient.setTcpNoDelay(true); + telnetClient.setReceiveBufferSize(1); + + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(telnetClient.getOutputStream())); + writer.write("GET " + CONTEXT + "/rest/customerservice/customers HTTP/1.1\nhost: localhost\n\n"); + writer.flush(); + telnetClient.disconnect(); + mock.assertIsSatisfied(); + onComplete.assertIsSatisfied(); + + } + + protected boolean isSynchronous() { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheSynchronousTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheSynchronousTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheSynchronousTest.java new file mode 100644 index 0000000..a50837c --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheSynchronousTest.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.cxf; + +public class CxfConsumerStreamCacheSynchronousTest extends CxfConsumerStreamCacheTest { + + @Override + protected boolean isSynchronous() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java new file mode 100644 index 0000000..07e12fc --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.cxf; + +import org.w3c.dom.Node; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.converter.stream.CachedOutputStream; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.test.junit4.CamelTestSupport; + +import org.junit.Test; + + +//Modified from https://issues.apache.org/jira/secure/attachment/12730161/0001-CAMEL-8419-Camel-StreamCache-does-not-work-with-CXF-.patch +public class CxfConsumerStreamCacheTest extends CamelTestSupport { + + protected static final String REQUEST_MESSAGE = "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:ser=\"test/service\">" + + "<soapenv:Header/><soapenv:Body><ser:ping/></soapenv:Body></soapenv:Envelope>"; + + protected static final String RESPONSE_MESSAGE_BEGINE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">" + + "<soap:Body><pong xmlns=\"test/service\""; + protected static final String RESPONSE_MESSAGE_END = "/></soap:Body></soap:Envelope>"; + + protected static final String RESPONSE = "<pong xmlns=\"test/service\"/>"; + + protected final String simpleEndpointAddress = "http://localhost:" + + CXFTestSupport.getPort1() + "/" + getClass().getSimpleName() + "/test"; + protected final String simpleEndpointURI = "cxf://" + simpleEndpointAddress + + "?synchronous=" + isSynchronous() + "&serviceClass=org.apache.camel.component.cxf.ServiceProvider&dataFormat=PAYLOAD"; + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + getContext().setStreamCaching(true); + getContext().getStreamCachingStrategy().setSpoolThreshold(1L); + errorHandler(noErrorHandler()); + from(getFromEndpointUri()).process(new Processor() { + public void process(final Exchange exchange) throws Exception { + Message in = exchange.getIn(); + Node node = in.getBody(Node.class); + assertNotNull(node); + CachedOutputStream cos = new CachedOutputStream(exchange); + cos.write(RESPONSE.getBytes("UTF-8")); + cos.close(); + exchange.getOut().setBody(cos.newStreamCache()); + + exchange.addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + template.sendBody("mock:onComplete", ""); + } + + @Override + public void onFailure(Exchange exchange) { + + } + }); + } + }); + } + }; + } + + @Test + public void testInvokingServiceFromHttpCompnent() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:onComplete"); + mock.expectedMessageCount(2); + + // call the service with right post message + + String response = template.requestBody(simpleEndpointAddress, REQUEST_MESSAGE, String.class); + assertTrue("Get a wrong response ", response.startsWith(RESPONSE_MESSAGE_BEGINE)); + assertTrue("Get a wrong response ", response.endsWith(RESPONSE_MESSAGE_END)); + try { + template.requestBody(simpleEndpointAddress, null, String.class); + fail("Excpetion to get exception here"); + } catch (Exception ex) { + // do nothing here + } + + response = template.requestBody(simpleEndpointAddress, REQUEST_MESSAGE, String.class); + assertTrue("Get a wrong response ", response.startsWith(RESPONSE_MESSAGE_BEGINE)); + assertTrue("Get a wrong response ", response.endsWith(RESPONSE_MESSAGE_END)); + mock.assertIsSatisfied(); + } + + protected String getFromEndpointUri() { + return simpleEndpointURI; + } + + protected boolean isSynchronous() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedSynchronousTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedSynchronousTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedSynchronousTest.java new file mode 100644 index 0000000..8c58f1d --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedSynchronousTest.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf.jaxrs; + +public class CxfRsConsumerClientDisconnectedSynchronousTest extends CxfRsConsumerClientDisconnectedTest { + + protected boolean isSynchronous() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java new file mode 100644 index 0000000..4a4a47c --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf.jaxrs; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; + +import javax.ws.rs.core.Response; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cxf.CXFTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.commons.net.telnet.TelnetClient; +import org.junit.Test; + +/** + * UnitOfWork should complete even if client disconnected during the processing. + */ +public class CxfRsConsumerClientDisconnectedTest extends CamelTestSupport { + private static final int PORT = CXFTestSupport.getPort1(); + private static final String CONTEXT = "/CxfRsConsumerClientDisconnectedTest"; + private static final String CXT = PORT + CONTEXT; + + private String cxfRsEndpointUri = "cxfrs://http://localhost:" + CXT + "/rest?synchronous=" + isSynchronous() + + "&dataFormat=PAYLOAD&resourceClasses=org.apache.camel.component.cxf.jaxrs.testbean.CustomerService"; + + protected RouteBuilder createRouteBuilder() throws Exception { + + return new RouteBuilder() { + public void configure() { + + getContext().setStreamCaching(true); + getContext().getStreamCachingStrategy().setSpoolThreshold(1L); + errorHandler(noErrorHandler()); + + Response ok = Response.ok().build(); + + from(cxfRsEndpointUri) + // should be able to convert to Customer + .to("mock:result") + .process(exchange-> { + Thread.sleep(100); + + exchange.addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + template.sendBody("mock:onComplete", ""); + } + + @Override + public void onFailure(Exchange exchange) { + + } + }); + }); + + }; + }; + } + + @Test + public void testClientDisconnect() throws Exception { + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + MockEndpoint onComplete = getMockEndpoint("mock:onComplete"); + onComplete.expectedMessageCount(1); + + TelnetClient telnetClient = new TelnetClient(); + + telnetClient.connect("localhost", PORT); + telnetClient.setTcpNoDelay(true); + telnetClient.setReceiveBufferSize(1); + + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(telnetClient.getOutputStream())); + writer.write("GET " + CONTEXT + "/rest/customerservice/customers HTTP/1.1\nhost: localhost\n\n"); + writer.flush(); + telnetClient.disconnect(); + mock.assertIsSatisfied(); + onComplete.assertIsSatisfied(); + + + + } + + protected boolean isSynchronous() { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheSynchronousTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheSynchronousTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheSynchronousTest.java new file mode 100644 index 0000000..99a2925 --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheSynchronousTest.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf.jaxrs; + +public class CxfRsStreamCacheSynchronousTest extends CxfRsStreamCacheTest { + + @Override + protected boolean isSynchronous() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e9091d3f/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java new file mode 100644 index 0000000..7a97b40 --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf.jaxrs; + +import javax.ws.rs.core.Response; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cxf.CXFTestSupport; +import org.apache.camel.component.cxf.jaxrs.testbean.Customer; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.converter.stream.CachedOutputStream; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.junit.Test; + + +public class CxfRsStreamCacheTest extends CamelTestSupport { + private static final String PUT_REQUEST = "<Customer><name>Mary</name><id>123</id></Customer>"; + private static final String CONTEXT = "/CxfRsStreamCacheTest"; + private static final String CXT = CXFTestSupport.getPort1() + CONTEXT; + private static final String RESPONSE = "<pong xmlns=\"test/service\"/>"; + + private String cxfRsEndpointUri = "cxfrs://http://localhost:" + CXT + "/rest?synchronous=" + isSynchronous() + + "&dataFormat=PAYLOAD&resourceClasses=org.apache.camel.component.cxf.jaxrs.testbean.CustomerService"; + + protected RouteBuilder createRouteBuilder() throws Exception { + + return new RouteBuilder() { + public void configure() { + + getContext().setStreamCaching(true); + getContext().getStreamCachingStrategy().setSpoolThreshold(1L); + errorHandler(noErrorHandler()); + + Response ok = Response.ok().build(); + + from(cxfRsEndpointUri) + // should be able to convert to Customer + .convertBodyTo(Customer.class) + .to("mock:result") + .process(exchange-> { + // respond with OK + CachedOutputStream cos = new CachedOutputStream(exchange); + cos.write(RESPONSE.getBytes("UTF-8")); + cos.close(); + exchange.getOut().setBody(cos.newStreamCache()); + + exchange.addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + template.sendBody("mock:onComplete", ""); + } + + @Override + public void onFailure(Exchange exchange) { + + } + }); + }); + + }; + }; + } + + @Test + public void testPutConsumer() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.message(0).body().isInstanceOf(Customer.class); + + MockEndpoint onComplete = getMockEndpoint("mock:onComplete"); + onComplete.expectedMessageCount(1); + + + HttpPut put = new HttpPut("http://localhost:" + CXT + "/rest/customerservice/customers"); + StringEntity entity = new StringEntity(PUT_REQUEST, "ISO-8859-1"); + entity.setContentType("text/xml; charset=ISO-8859-1"); + put.addHeader("test", "header1;header2"); + put.setEntity(entity); + CloseableHttpClient httpclient = HttpClientBuilder.create().build(); + + try { + HttpResponse response = httpclient.execute(put); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(RESPONSE, EntityUtils.toString(response.getEntity())); + } finally { + httpclient.close(); + } + + mock.assertIsSatisfied(); + onComplete.assertIsSatisfied(); + + } + + protected boolean isSynchronous() { + return false; + } + +}