Author: ningjiang Date: Thu Jul 22 14:10:42 2010 New Revision: 966670 URL: http://svn.apache.org/viewvc?rev=966670&view=rev Log: CAMEL-2898 CXF component supports non blocking routing engine
Added: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java (with props) Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java camel/trunk/parent/pom.xml Added: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java?rev=966670&view=auto ============================================================================== --- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java (added) +++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java Thu Jul 22 14:10:42 2010 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf; + +import java.util.Map; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.endpoint.ClientCallback; +import org.apache.cxf.service.model.BindingOperationInfo; + +public class CxfClientCallback extends ClientCallback { + private final AsyncCallback camelAsyncCallback; + private final Exchange camelExchange; + private final org.apache.cxf.message.Exchange cxfExchange; + private final BindingOperationInfo boi; + private final CxfEndpoint endpoint; + + public CxfClientCallback(AsyncCallback callback, + Exchange camelExchange, + org.apache.cxf.message.Exchange cxfExchange, + BindingOperationInfo boi, + CxfEndpoint endpoint) { + this.camelAsyncCallback = callback; + this.camelExchange = camelExchange; + this.cxfExchange = cxfExchange; + this.boi = boi; + this.endpoint = endpoint; + } + + public void handleResponse(Map<String, Object> ctx, Object[] res) { + super.handleResponse(ctx, res); + // bind the CXF response to Camel exchange + if (!boi.getOperationInfo().isOneWay()) { + // copy the InMessage header to OutMessage header + camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders()); + endpoint.getCxfBinding().populateExchangeFromCxfResponse(camelExchange, cxfExchange, + ctx); + } + camelAsyncCallback.done(false); + } + + public void handleException(Map<String, Object> ctx, Throwable ex) { + super.handleException(ctx, ex); + camelExchange.setException(ex); + camelAsyncCallback.done(false); + } + + +} Propchange: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=966670&r1=966669&r2=966670&view=diff ============================================================================== --- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original) +++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Thu Jul 22 14:10:42 2010 @@ -27,6 +27,8 @@ import javax.xml.namespace.QName; import javax.xml.ws.Holder; import javax.xml.ws.handler.MessageContext.Scope; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultProducer; @@ -48,9 +50,10 @@ import org.apache.cxf.service.model.Bind * * @version $Revision$ */ -public class CxfProducer extends DefaultProducer { +public class CxfProducer extends DefaultProducer implements AsyncProcessor { private static final Log LOG = LogFactory.getLog(CxfProducer.class); private Client client; + private CxfEndpoint endpoint; /** * Constructor to create a CxfProducer. It will create a CXF client @@ -62,8 +65,42 @@ public class CxfProducer extends Default */ public CxfProducer(CxfEndpoint endpoint) throws Exception { super(endpoint); + this.endpoint = endpoint; client = endpoint.createClient(); } + + // As the cxf client async and sync api is implement different, + // so we don't delegate the sync process call to the async process + public boolean process(Exchange camelExchange, AsyncCallback callback) { + if (LOG.isTraceEnabled()) { + LOG.trace("Process exchange: " + camelExchange); + } + + try { + // create CXF exchange + ExchangeImpl cxfExchange = new ExchangeImpl(); + + // prepare binding operation info + BindingOperationInfo boi = prepareBindingOperation(camelExchange, cxfExchange); + + Map<String, Object> invocationContext = new HashMap<String, Object>(); + Map<String, Object> responseContext = new HashMap<String, Object>(); + invocationContext.put(Client.RESPONSE_CONTEXT, responseContext); + invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange)); + + CxfClientCallback cxfClientCallback = new CxfClientCallback(callback, camelExchange, cxfExchange, boi, endpoint); + // send the CXF async request + client.invoke(cxfClientCallback, boi, getParams(endpoint, camelExchange), + invocationContext, cxfExchange); + } catch (Exception ex) { + // error occurred before we had a chance to go async + // so set exception and invoke callback true + camelExchange.setException(ex); + callback.done(true); + return true; + } + return false; + } /** * This processor binds Camel exchange to a CXF exchange and @@ -78,15 +115,32 @@ public class CxfProducer extends Default // create CXF exchange ExchangeImpl cxfExchange = new ExchangeImpl(); - // get CXF binding - CxfEndpoint endpoint = (CxfEndpoint)getEndpoint(); - CxfBinding binding = endpoint.getCxfBinding(); + // prepare binding operation info + BindingOperationInfo boi = prepareBindingOperation(camelExchange, cxfExchange); + + Map<String, Object> invocationContext = new HashMap<String, Object>(); + Map<String, Object> responseContext = new HashMap<String, Object>(); + invocationContext.put(Client.RESPONSE_CONTEXT, responseContext); + invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange)); + + // send the CXF request + client.invoke(boi, getParams(endpoint, camelExchange), + invocationContext, cxfExchange); + + // bind the CXF response to Camel exchange + if (!boi.getOperationInfo().isOneWay()) { + // copy the InMessage header to OutMessage header + camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders()); + endpoint.getCxfBinding().populateExchangeFromCxfResponse(camelExchange, cxfExchange, + responseContext); + } + } + + protected Map<String, Object> prepareRequest(Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange) throws Exception { // create invocation context WrappedMessageContext requestContext = new WrappedMessageContext( new HashMap<String, Object>(), null, Scope.APPLICATION); - Map<String, Object> responseContext = new HashMap<String, Object>(); - // set data format mode in exchange DataFormat dataFormat = endpoint.getDataFormat(); @@ -107,13 +161,26 @@ public class CxfProducer extends Default + "=" + true); } } + + // bind the request CXF exchange + endpoint.getCxfBinding().populateCxfRequestFromExchange(cxfExchange, camelExchange, + requestContext); + + // Remove protocol headers from scopes. Otherwise, response headers can be + // overwritten by request headers when SOAPHandlerInterceptor tries to create + // a wrapped message context by the copyScoped() method. + requestContext.getScopes().remove(Message.PROTOCOL_HEADERS); + return requestContext.getWrappedMap(); + } + + private BindingOperationInfo prepareBindingOperation(Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange) { // get binding operation info BindingOperationInfo boi = getBindingOperationInfo(camelExchange); ObjectHelper.notNull(boi, "BindingOperationInfo"); // keep the message wrapper in PAYLOAD mode - if (dataFormat == DataFormat.PAYLOAD && boi.isUnwrapped()) { + if (endpoint.getDataFormat() == DataFormat.PAYLOAD && boi.isUnwrapped()) { boi = boi.getWrappedOperation(); cxfExchange.put(BindingOperationInfo.class, boi); @@ -126,7 +193,7 @@ public class CxfProducer extends Default } // Unwrap boi before passing it to make a client call - if (dataFormat != DataFormat.PAYLOAD && !endpoint.isWrapped() && boi != null) { + if (endpoint.getDataFormat() != DataFormat.PAYLOAD && !endpoint.isWrapped() && boi != null) { if (boi.isUnwrappedCapable()) { boi = boi.getUnwrappedOperation(); if (LOG.isTraceEnabled()) { @@ -134,31 +201,7 @@ public class CxfProducer extends Default } } } - - // bind the request CXF exchange - binding.populateCxfRequestFromExchange(cxfExchange, camelExchange, - requestContext); - - // Remove protocol headers from scopes. Otherwise, response headers can be - // overwritten by request headers when SOAPHandlerInterceptor tries to create - // a wrapped message context by the copyScoped() method. - requestContext.getScopes().remove(Message.PROTOCOL_HEADERS); - - Map<String, Object> invocationContext = new HashMap<String, Object>(); - invocationContext.put(Client.RESPONSE_CONTEXT, responseContext); - invocationContext.put(Client.REQUEST_CONTEXT, requestContext.getWrappedMap()); - - // send the CXF request - client.invoke(boi, getParams(endpoint, camelExchange), - invocationContext, cxfExchange); - - // bind the CXF response to Camel exchange - if (!boi.getOperationInfo().isOneWay()) { - // copy the InMessage header to OutMessage header - camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders()); - binding.populateExchangeFromCxfResponse(camelExchange, cxfExchange, - responseContext); - } + return boi; } private void checkParameterSize(CxfEndpoint endpoint, Exchange exchange, Object[] parameters) { Modified: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java?rev=966670&r1=966669&r2=966670&view=diff ============================================================================== --- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java (original) +++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java Thu Jul 22 14:10:42 2010 @@ -28,6 +28,7 @@ import org.junit.Test; public class CxfRawMessageRouterTest extends CxfSimpleRouterTest { private String routerEndpointURI = "cxf://" + ROUTER_ADDRESS + "?" + SERVICE_CLASS + "&dataFormat=MESSAGE"; private String serviceEndpointURI = "cxf://" + SERVICE_ADDRESS + "?" + SERVICE_CLASS + "&dataFormat=MESSAGE"; + protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { @@ -60,6 +61,7 @@ public class CxfRawMessageRouterTest ext + "<soap:Body><ns1:echo xmlns:ns1=\"http://cxf.component.camel.apache.org/\">" + "<arg0 xmlns=\"http://cxf.component.camel.apache.org/\">hello world</arg0>" + "</ns1:echo></soap:Body></soap:Envelope>"); + } }); Modified: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java?rev=966670&r1=966669&r2=966670&view=diff ============================================================================== --- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java (original) +++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java Thu Jul 22 14:10:42 2010 @@ -60,11 +60,11 @@ public class CxfTimeoutTest extends Came protected void sendTimeOutMessage(String endpointUri) throws Exception { Exchange reply = sendJaxWsMessage(endpointUri); Exception e = reply.getException(); - assertNotNull("We should get the exception cause here", e.getCause()); - assertTrue("We should get the socket time out exception here", e.getCause().getCause() instanceof SocketTimeoutException); + assertNotNull("We should get the exception cause here", e); + assertTrue("We should get the socket time out exception here", e instanceof SocketTimeoutException); } - protected Exchange sendJaxWsMessage(String endpointUri) { + protected Exchange sendJaxWsMessage(String endpointUri) throws InterruptedException { Exchange exchange = template.send(endpointUri, new Processor() { public void process(final Exchange exchange) { final List<String> params = new ArrayList<String>(); Modified: camel/trunk/parent/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/parent/pom.xml?rev=966670&r1=966669&r2=966670&view=diff ============================================================================== --- camel/trunk/parent/pom.xml (original) +++ camel/trunk/parent/pom.xml Thu Jul 22 14:10:42 2010 @@ -48,7 +48,7 @@ <commons-collections-version>3.2.1</commons-collections-version> <commons-pool-version>1.5.4</commons-pool-version> <commons-dbcp-version>1.3</commons-dbcp-version> - <cxf-version>2.2.9</cxf-version> + <cxf-version>2.2.10-SNAPSHOT</cxf-version> <derby-version>10.4.2.0</derby-version> <dozer-version>5.2.2</dozer-version> <easymock-version>2.5.2</easymock-version>