CAMEL-6609 fixed the issue of CXF FailoverFeature does not take effect when camel-cxf producer uses async invocation
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af76623d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af76623d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af76623d Branch: refs/heads/camel-2.10.x Commit: af76623dde24f0f3086008fad35003d19bfeb6e3 Parents: 9e7f2c6 Author: Willem Jiang <ningji...@apache.org> Authored: Tue Aug 6 10:27:52 2013 +0800 Committer: Willem Jiang <ningji...@apache.org> Committed: Tue Aug 6 19:25:07 2013 +0800 ---------------------------------------------------------------------- components/camel-cxf/pom.xml | 7 + .../camel/component/cxf/CxfClientCallback.java | 26 +++- .../apache/camel/component/cxf/CxfProducer.java | 2 +- .../component/cxf/FailOverFeatureTest.java | 142 +++++++++++++++++++ 4 files changed, 175 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/af76623d/components/camel-cxf/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-cxf/pom.xml b/components/camel-cxf/pom.xml index 2eeec6b..c8a99e5 100644 --- a/components/camel-cxf/pom.xml +++ b/components/camel-cxf/pom.xml @@ -158,6 +158,13 @@ <scope>test</scope> </dependency> + <!-- test for cxf failover feature --> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-features-clustering</artifactId> + <version>${cxf-version}</version> + </dependency> + <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-transports-jms</artifactId> http://git-wip-us.apache.org/repos/asf/camel/blob/af76623d/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java index 2e6e60a..a6f936f 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java @@ -20,7 +20,9 @@ import java.util.Map; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.cxf.endpoint.Client; import org.apache.cxf.endpoint.ClientCallback; +import org.apache.cxf.endpoint.ClientImpl; import org.apache.cxf.service.model.BindingOperationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +35,25 @@ public class CxfClientCallback extends ClientCallback { private final org.apache.cxf.message.Exchange cxfExchange; private final BindingOperationInfo boi; private final CxfBinding binding; + private final Client client; public CxfClientCallback(AsyncCallback callback, Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange, BindingOperationInfo boi, CxfBinding binding) { + this(null, callback, camelExchange, cxfExchange, boi, binding); + } + + public CxfClientCallback(Client client, AsyncCallback callback, + Exchange camelExchange, + org.apache.cxf.message.Exchange cxfExchange, + BindingOperationInfo boi, + CxfBinding binding) { this.camelAsyncCallback = callback; this.camelExchange = camelExchange; this.cxfExchange = cxfExchange; + this.client = client; this.boi = boi; this.binding = binding; } @@ -69,7 +81,19 @@ public class CxfClientCallback extends ClientCallback { public void handleException(Map<String, Object> ctx, Throwable ex) { try { super.handleException(ctx, ex); - camelExchange.setException(ex); + // need to call the conduitSelector complete method to enable the fail over feature + if (client instanceof ClientImpl) { + ((ClientImpl)client).getConduitSelector().complete(cxfExchange); + ex = cxfExchange.getOutMessage().getContent(Exception.class); + if (ex == null && cxfExchange.getInMessage() != null) { + ex = cxfExchange.getInMessage().getContent(Exception.class); + } + if (ex != null) { + camelExchange.setException(ex); + } + } else { + camelExchange.setException(ex); + } } finally { // copy the context information and // call camel callback http://git-wip-us.apache.org/repos/asf/camel/blob/af76623d/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java index 28b5d12..dbbe264 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java @@ -106,7 +106,7 @@ public class CxfProducer extends DefaultProducer implements AsyncProcessor { invocationContext.put(Client.RESPONSE_CONTEXT, responseContext); invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange)); - CxfClientCallback cxfClientCallback = new CxfClientCallback(callback, camelExchange, cxfExchange, boi, + CxfClientCallback cxfClientCallback = new CxfClientCallback(client, callback, camelExchange, cxfExchange, boi, endpoint.getCxfBinding()); // send the CXF async request client.invoke(cxfClientCallback, boi, getParams(endpoint, camelExchange), http://git-wip-us.apache.org/repos/asf/camel/blob/af76623d/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java new file mode 100644 index 0000000..4b7239a --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf; + +import java.util.ArrayList; +import java.util.List; +import javax.xml.ws.Endpoint; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cxf.CxfEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.cxf.clustering.FailoverFeature; +import org.apache.cxf.clustering.RandomStrategy; +import org.apache.cxf.frontend.ClientProxyFactoryBean; +import org.apache.cxf.frontend.ServerFactoryBean; +import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class FailOverFeatureTest { + + private static int port1 = CXFTestSupport.getPort1(); + private static int port2 = CXFTestSupport.getPort2(); + private static int port3 = CXFTestSupport.getPort3(); + private static int port4 = AvailablePortFinder.getNextAvailable(); + + private static final String SERVICE_ADDRESS = "http://localhost:" + port1 + "/FailOverFeatureTest"; + private static final String PAYLOAD_PROXY_ADDRESS = "http://localhost:" + port2 + "/FailOverFeatureTest/proxy"; + private static final String POJO_PROXY_ADDRESS = "http://localhost:" + port3 + "/FailOverFeatureTest/proxy"; + private static final String NONE_EXIST_ADDRESS = "http://localhost:" + port4 + "/FailOverFeatureTest"; + private DefaultCamelContext context1; + private DefaultCamelContext context2; + + @BeforeClass + public static void init() { + + // publish a web-service + String addr1 = "http://localhost:9001/hello"; + ServerFactoryBean factory = new ServerFactoryBean(); + factory.setAddress(SERVICE_ADDRESS); + factory.setServiceBean(new HelloServiceImpl()); + factory.create(); + } + + @Test + public void testPojo() throws Exception { + startRoutePojo(); + Assert.assertEquals("hello", tryFailover(POJO_PROXY_ADDRESS)); + if (context2 != null) { + context2.stop(); + } + } + + @Test + public void testPayload() throws Exception { + startRoutePayload(); + Assert.assertEquals("hello", tryFailover(PAYLOAD_PROXY_ADDRESS)); + if (context1 != null) { + context1.stop(); + } + } + + private void startRoutePayload() throws Exception { + + String proxy = "cxf://" + PAYLOAD_PROXY_ADDRESS + "?wsdlURL=" + SERVICE_ADDRESS + "?wsdl" + + "&dataFormat=PAYLOAD"; + + // use a non-exists address to trigger fail-over + // another problem is: if synchronous=false fail-over will not happen + String real = "cxf://" + NONE_EXIST_ADDRESS + "?wsdlURL=" + SERVICE_ADDRESS + "?wsdl" + + "&dataFormat=PAYLOAD"; + + context1 = new DefaultCamelContext(); + startRoute(context1, proxy, real); + } + + private void startRoutePojo() throws Exception { + + String proxy = "cxf://" + POJO_PROXY_ADDRESS + "?serviceClass=" + "org.apache.camel.component.cxf.HelloService" + + "&dataFormat=POJO"; + + // use a non-exists address to trigger fail-over + String real = "cxf://" + NONE_EXIST_ADDRESS + "?serviceClass=" + "org.apache.camel.component.cxf.HelloService" + + "&dataFormat=POJO"; + + context2 = new DefaultCamelContext(); + startRoute(context2, proxy, real); + } + + private void startRoute(DefaultCamelContext ctx, final String proxy, final String real) throws Exception { + + ctx.addRoutes(new RouteBuilder() { + public void configure() { + String alt = SERVICE_ADDRESS; + + List<String> serviceList = new ArrayList<String>(); + serviceList.add(alt); + + RandomStrategy strategy = new RandomStrategy(); + strategy.setAlternateAddresses(serviceList); + + FailoverFeature ff = new FailoverFeature(); + ff.setStrategy(strategy); + + CxfEndpoint endpoint = (CxfEndpoint)(endpoint(real)); + endpoint.getFeatures().add(ff); + + from(proxy).to(endpoint); + } + }); + ctx.start(); + + } + + private String tryFailover(String url) { + + ClientProxyFactoryBean factory = new ClientProxyFactoryBean(); + + factory.setServiceClass(HelloService.class); + factory.setAddress(url); + + HelloService client = (HelloService)factory.create(); + return client.sayHello(); + } + +}