Add async support for camel-cxf JAX-RS producer

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/208e21c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/208e21c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/208e21c5

Branch: refs/heads/master
Commit: 208e21c5a65ad2b561e8856f0a22949d7e7c4b80
Parents: 0b20682
Author: jpoth <poth.j...@gmail.com>
Authored: Tue Nov 22 19:07:26 2016 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Dec 21 18:00:48 2016 +0100

----------------------------------------------------------------------
 .../camel/component/cxf/jaxrs/CxfRsBinding.java |  12 +
 .../component/cxf/jaxrs/CxfRsEndpoint.java      |   8 +-
 .../component/cxf/jaxrs/CxfRsProducer.java      | 352 ++++++++++++-
 .../cxf/jaxrs/DefaultCxfRsBinding.java          |  16 +
 .../jaxrs/CxfRsAsyncProducerSessionTest.java    | 112 +++++
 .../cxf/jaxrs/CxfRsAsyncProducerTest.java       | 490 +++++++++++++++++++
 .../cxf/jaxrs/CxfRsAsyncRelayTest.java          | 147 ++++++
 .../CxfRsBindingConfigurationSelectionTest.java |   6 +
 .../component/cxf/jaxrs/CxfRsProducerTest.java  |  20 +-
 .../cxf/jaxrs/CxfRsSslAsyncProducerTest.java    |  99 ++++
 .../cxf/jaxrs/CxfRsSpringAsyncProducer.xml      |  71 +++
 .../jaxrs/CxfRsSpringAsyncProducerSession.xml   |  90 ++++
 .../cxf/jaxrs/CxfRsSpringAsyncRelay.xml         |  47 ++
 .../component/cxf/jaxrs/CxfRsSpringProducer.xml |   4 +-
 .../CxfRsSpringProducerAddressOverride.xml      |   4 +-
 .../cxf/jaxrs/CxfRsSpringProducerSession.xml    |  24 +-
 .../component/cxf/jaxrs/CxfRsSpringRelay.xml    |   2 +-
 .../component/cxf/jaxrs/CxfRsSpringRouter.xml   |   2 +-
 .../cxf/jaxrs/CxfRsSpringSslAsyncProducer.xml   |  94 ++++
 .../cxf/jaxrs/CxfRsSpringSslProducer.xml        |   8 +-
 20 files changed, 1558 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java
index 50ca905..17b68ca 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.cxf.jaxrs;
 import java.lang.reflect.Method;
 import java.util.Map;
 
+import javax.ws.rs.client.AsyncInvoker;
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MultivaluedMap;
 
 import org.apache.cxf.message.Exchange;
@@ -104,4 +106,14 @@ public interface CxfRsBinding {
                                                           
org.apache.camel.Exchange camelExchange)
         throws Exception;
 
+    /**
+     * Bind the Camel message to a request {@link Entity} that gets passed to 
{@link AsyncInvoker#method(java.lang.String, javax.ws.rs.client.Entity, 
javax.ws.rs.client.InvocationCallback)}.
+     *
+     * @param camelMessage  the source message
+     * @param camelExchange the Camel exchange
+     * @param body the message body
+     * @throws Exception can be thrown if error in the binding process
+     * @return the {@link Entity} to use
+     */
+    Entity<Object> bindCamelMessageToRequestEntity(Object body, 
org.apache.camel.Message camelMessage, org.apache.camel.Exchange camelExchange) 
throws Exception ;
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java
index 3435d20..822a07c 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.camel.Service;
 import org.apache.camel.component.cxf.NullFaultListener;
 import org.apache.camel.http.common.cookie.CookieHandler;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spi.UriEndpoint;
@@ -195,7 +196,12 @@ public class CxfRsEndpoint extends DefaultEndpoint 
implements HeaderFilterStrate
         if (bindingStyle == BindingStyle.SimpleConsumer) {
             throw new IllegalArgumentException("The SimpleConsumer Binding 
Style cannot be used in a camel-cxfrs producer");
         }
-        return new CxfRsProducer(this);
+        final CxfRsProducer cxfRsProducer = new CxfRsProducer(this);
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(cxfRsProducer);
+        } else {
+            return cxfRsProducer;
+        }
     }
 
     public boolean isSingleton() {

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 8ff54c3..50aece0 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -30,9 +30,16 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.ResponseProcessingException;
+import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.NewCookie;
 import javax.ws.rs.core.Response;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -57,7 +64,7 @@ import org.slf4j.LoggerFactory;
  * JAXRS client, it will turn the normal Object invocation to a RESTful request
  * according to resource annotation.  Any response will be bound to Camel 
exchange.
  */
-public class CxfRsProducer extends DefaultProducer {
+public class CxfRsProducer extends DefaultProducer implements AsyncProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CxfRsProducer.class);
 
@@ -96,7 +103,122 @@ public class CxfRsProducer extends DefaultProducer {
             invokeProxyClient(exchange);
         }
     }
-    
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            Message inMessage = exchange.getIn();
+            Boolean httpClientAPI = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class);
+            // set the value with endpoint's option
+            if (httpClientAPI == null) {
+                httpClientAPI = ((CxfRsEndpoint) 
getEndpoint()).isHttpClientAPI();
+            }
+            if (httpClientAPI.booleanValue()) {
+                invokeAsyncHttpClient(exchange, callback);
+            } else {
+                invokeAsyncProxyClient(exchange, callback);
+            }
+            return false;
+        } catch (Exception exception) {
+            LOG.error("Error invoking request", exception);
+            exchange.setException(exception);
+            callback.done(true);
+            return true;
+        }
+    }
+
+    protected void invokeAsyncHttpClient(Exchange exchange, final 
AsyncCallback callback) throws Exception {
+        Message inMessage = exchange.getIn();
+        JAXRSClientFactoryBean cfb = 
clientFactoryBeanCache.get(CxfEndpointUtils
+            .getEffectiveAddress(exchange, ((CxfRsEndpoint) 
getEndpoint()).getAddress()));
+        Bus bus = ((CxfRsEndpoint) getEndpoint()).getBus();
+        // We need to apply the bus setting from the CxfRsEndpoint which is 
not use the default bus
+        if (bus != null) {
+            cfb.setBus(bus);
+        }
+        WebClient client = cfb.createWebClient();
+        ((CxfRsEndpoint) 
getEndpoint()).getChainedCxfRsEndpointConfigurer().configureClient(client);
+        String httpMethod = inMessage.getHeader(Exchange.HTTP_METHOD, 
String.class);
+        Class<?> responseClass = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Class.class);
+        Type genericType = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_GENERIC_TYPE, 
Type.class);
+        Object[] pathValues = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
+        String path = inMessage.getHeader(Exchange.HTTP_PATH, String.class);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("HTTP method = {}", httpMethod);
+            LOG.trace("path = {}", path);
+            LOG.trace("responseClass = {}", responseClass);
+        }
+
+        // set the path
+        if (path != null) {
+            if (ObjectHelper.isNotEmpty(pathValues) && pathValues.length > 0) {
+                client.path(path, pathValues);
+            } else {
+                client.path(path);
+            }
+        }
+
+        CxfRsEndpoint cxfRsEndpoint = (CxfRsEndpoint) getEndpoint();
+        CxfRsBinding binding = cxfRsEndpoint.getBinding();
+        Object body = getBody(exchange, inMessage, httpMethod, cxfRsEndpoint, 
binding);
+        setupClientMatrix(client, exchange);
+        setupClientQueryAndHeaders(client, exchange);
+
+        //Build message entity
+        Entity<Object> entity = binding.bindCamelMessageToRequestEntity(body, 
inMessage, exchange);
+
+        // handle cookies
+        CookieHandler cookieHandler = 
((CxfRsEndpoint)getEndpoint()).getCookieHandler();
+        loadCookies(exchange, client, cookieHandler);
+        
+        // invoke the client
+        client.async().method(httpMethod, entity, new 
CxfInvocationCallback(client, exchange, cxfRsEndpoint, responseClass, callback, 
genericType));
+    }
+
+    protected void invokeAsyncProxyClient(Exchange exchange, final 
AsyncCallback callback) throws Exception {
+        Message inMessage = exchange.getIn();
+        Object[] varValues = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
+        String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, 
String.class);
+        Client target;
+
+        JAXRSClientFactoryBean cfb = 
clientFactoryBeanCache.get(CxfEndpointUtils
+            .getEffectiveAddress(exchange, 
((CxfRsEndpoint)getEndpoint()).getAddress()));
+        Bus bus = ((CxfRsEndpoint)getEndpoint()).getBus();
+        // We need to apply the bus setting from the CxfRsEndpoint which is 
not use the default bus
+        if (bus != null) {
+            cfb.setBus(bus);
+        }
+        if (varValues == null) {
+            target = cfb.create();
+        } else {
+            target = cfb.createWithValues(varValues);
+        }
+
+        setupClientHeaders(target, exchange);
+
+        // find out the method which we want to invoke
+        JAXRSServiceFactoryBean sfb = cfb.getServiceFactory();
+        sfb.getResourceClasses();
+        // check the null body first
+        Object[] parameters = null;
+        if (inMessage.getBody() != null) {
+            parameters = inMessage.getBody(Object[].class);
+        }
+        // get the method
+        Method method = findRightMethod(sfb.getResourceClasses(), methodName, 
getParameterTypes(parameters));
+
+        CxfRsEndpoint cxfRsEndpoint = (CxfRsEndpoint) getEndpoint();
+        final CxfProxyInvocationCallback invocationCallback = new 
CxfProxyInvocationCallback(target, exchange, cxfRsEndpoint, callback);
+        
WebClient.getConfig(target).getRequestContext().put(InvocationCallback.class.getName(),
 invocationCallback);
+        
+        // handle cookies
+        CookieHandler cookieHandler = 
((CxfRsEndpoint)getEndpoint()).getCookieHandler();
+        loadCookies(exchange, target, cookieHandler);
+        
+        method.invoke(target, parameters);
+    }
+
+
     @SuppressWarnings("unchecked")
     protected void setupClientQueryAndHeaders(WebClient client, Exchange 
exchange) throws Exception {
         Message inMessage = exchange.getIn();
@@ -192,19 +314,7 @@ public class CxfRsProducer extends DefaultProducer {
 
         CxfRsBinding binding = cxfRsEndpoint.getBinding();
 
-        // set the body
-        Object body = null;
-        if (!"GET".equals(httpMethod)) {
-            // need to check the request object if the http Method is not GET  
    
-            if ("DELETE".equals(httpMethod) && 
cxfRsEndpoint.isIgnoreDeleteMethodMessageBody()) {
-                // just ignore the message body if the 
ignoreDeleteMethodMessageBody is true
-            } else {
-                body = binding.bindCamelMessageBodyToRequestBody(inMessage, 
exchange);
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Request body = " + body);
-                }
-            }
-        }
+        Object body = getBody(exchange, inMessage, httpMethod, cxfRsEndpoint, 
binding);
         
         setupClientMatrix(client, exchange); 
 
@@ -507,7 +617,217 @@ public class CxfRsProducer extends DefaultProducer {
 
         return answer;
     }
-    
+
+    private Object getBody(Exchange exchange, Message inMessage, String 
httpMethod, CxfRsEndpoint cxfRsEndpoint, CxfRsBinding binding) throws Exception 
{
+        Object body = null;
+        if (!"GET".equals(httpMethod)) {
+            // need to check the request object if the http Method is not GET  
    
+            if ("DELETE".equals(httpMethod) && 
cxfRsEndpoint.isIgnoreDeleteMethodMessageBody()) {
+                // just ignore the message body if the 
ignoreDeleteMethodMessageBody is true
+            } else {
+                body = binding.bindCamelMessageBodyToRequestBody(inMessage, 
exchange);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Request body = " + body);
+                }
+            }
+        }
+        return body;
+    }
+    private final class CxfInvocationCallback implements 
InvocationCallback<Response> {
+
+        private final Exchange exchange;
+        private final CxfRsEndpoint cxfRsEndpoint;
+        private final Class<?> responseClass;
+        private final AsyncCallback callback;
+        private final Type genericType;
+        private final Client client;
+
+        private CxfInvocationCallback(Client client, Exchange exchange, 
CxfRsEndpoint cxfRsEndpoint, Class<?> responseClass, AsyncCallback callback, 
Type genericType) {
+            this.exchange = exchange;
+            this.cxfRsEndpoint = cxfRsEndpoint;
+            this.responseClass = responseClass;
+            this.callback = callback;
+            this.genericType = genericType;
+            this.client = client;
+        }
+
+
+        @Override
+        public void completed(Response response) {
+            try {
+                if (shouldHandleError(response)) {
+                    handleError(response);
+                    return;
+                }
+                // handle cookies
+                saveCookies(exchange, client, 
cxfRsEndpoint.getCookieHandler());
+                if (!exchange.getPattern().isOutCapable()) {
+                    return;
+                }
+
+                LOG.trace("Response body = {}", response);
+                
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+                final CxfRsBinding binding = cxfRsEndpoint.getBinding();
+                
exchange.getOut().getHeaders().putAll(binding.bindResponseHeadersToCamelHeaders(response,
 exchange));
+                
+                if (genericType != null && !genericType.equals(Void.TYPE)) {
+                    GenericType genericType = new 
GenericType(this.genericType);
+                    
exchange.getOut().setBody(binding.bindResponseToCamelBody(response.readEntity(genericType),
 exchange));
+                } else if (responseClass != null && 
!responseClass.equals(Void.TYPE)) {
+                    
exchange.getOut().setBody(binding.bindResponseToCamelBody(response.readEntity(responseClass),
 exchange));
+                } else {
+                    
exchange.getOut().setBody(binding.bindResponseToCamelBody(response, exchange));
+                }
+                exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 
response.getStatus());
+            } catch (Exception exception) {
+                LOG.error("Error while processing response", exception);
+                fail(exception);
+            } finally {
+                callback.done(false);
+            }
+        }
+
+        @Override
+        public void failed(Throwable throwable) {
+            LOG.error("Failed request ", throwable);
+            try {
+                // handle cookies
+                saveCookies(exchange, client, 
cxfRsEndpoint.getCookieHandler());
+                fail(throwable);
+            } catch (Exception error) {
+                LOG.error("Error while processing failed request", error);
+            } finally {
+                callback.done(false);
+            }
+        }
+
+        private void fail(Throwable throwable) {
+            if 
(throwable.getClass().isInstance(WebApplicationException.class)) {
+                final WebApplicationException cast = 
WebApplicationException.class.cast(throwable);
+                final Response response = cast.getResponse();
+                if (shouldHandleError(response)) {
+                    handleError(response);
+                }
+            } else if 
(throwable.getClass().isInstance(ResponseProcessingException.class)) {
+                final ResponseProcessingException cast = 
ResponseProcessingException.class.cast(throwable);
+                final Response response = cast.getResponse();
+                if (shouldHandleError(response)) {
+                    handleError(response);
+                }
+            } else {
+                exchange.setException(throwable);    
+            }
+        }
+
+        private boolean shouldHandleError(Response response) {
+            //Throw exception on a response > 207
+            //http://en.wikipedia.org/wiki/List_of_HTTP_status_codes
+            if (response != null && throwException) {
+                Integer respCode = response.getStatus();
+                if (respCode > 207) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        private void handleError(Response response) {
+            exchange.setException(populateCxfRsProducerException(exchange, 
response, response.getStatus()));
+        }
+    }
+
+    private final class CxfProxyInvocationCallback implements 
InvocationCallback<Object> {
+
+        private final Exchange exchange;
+        private final CxfRsEndpoint cxfRsEndpoint;
+        private final AsyncCallback callback;
+        private final Client client;
+
+        private CxfProxyInvocationCallback(Client client, Exchange exchange, 
CxfRsEndpoint cxfRsEndpoint, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.cxfRsEndpoint = cxfRsEndpoint;
+            this.callback = callback;
+            this.client = client;
+        }
+
+        @Override
+        public void completed(Object body) {
+            try {
+                Response response = client.getResponse();
+                // handle cookies
+                saveCookies(exchange, client, 
cxfRsEndpoint.getCookieHandler());
+                //handle error
+                if (shouldHandleError(response)) {
+                    handleError(response);
+                    return;
+                }
+                if (!exchange.getPattern().isOutCapable()) {
+                    return;
+                }
+
+                LOG.trace("Response body = {}", response);
+                
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+                final CxfRsBinding binding = cxfRsEndpoint.getBinding();
+                
exchange.getOut().getHeaders().putAll(binding.bindResponseHeadersToCamelHeaders(response,
 exchange));
+                
exchange.getOut().setBody(binding.bindResponseToCamelBody(body, exchange));
+                exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 
response.getStatus());
+            } catch (Exception exception) {
+                LOG.error("Error while processing response", exception);
+                fail(exception);
+            } finally {
+                callback.done(false);
+            }
+        }
+
+        @Override
+        public void failed(Throwable throwable) {
+            LOG.error("Failed request ", throwable);
+            try {
+                // handle cookies
+                saveCookies(exchange, client, 
cxfRsEndpoint.getCookieHandler());
+                fail(throwable);
+            } catch (Exception error) {
+                LOG.error("Error while processing failed request", error);
+            } finally {
+                callback.done(false);
+            }
+        }
+
+        private void fail(Throwable throwable) {
+            if 
(throwable.getClass().isInstance(WebApplicationException.class)) {
+                final WebApplicationException cast = 
WebApplicationException.class.cast(throwable);
+                final Response response = cast.getResponse();
+                if (shouldHandleError(response)) {
+                    handleError(response);
+                }
+            } else if 
(throwable.getClass().isInstance(ResponseProcessingException.class)) {
+                final ResponseProcessingException cast = 
ResponseProcessingException.class.cast(throwable);
+                final Response response = cast.getResponse();
+                if (shouldHandleError(response)) {
+                    handleError(response);
+                }
+            } else {
+                exchange.setException(throwable);
+            }
+        }
+
+        private void handleError(Response response) {
+            exchange.setException(populateCxfRsProducerException(exchange, 
response, response.getStatus()));
+        }
+
+        private boolean shouldHandleError(Response response) {
+            //Throw exception on a response > 207
+            //http://en.wikipedia.org/wiki/List_of_HTTP_status_codes
+            if (response != null && throwException) {
+                Integer respCode = response.getStatus();
+                if (respCode > 207) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
     /**
      * Cache contains {@link 
org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean}
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java
index 673ec6a..1658a94 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java
@@ -21,11 +21,15 @@ import java.lang.reflect.Method;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import javax.security.auth.Subject;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Variant;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -183,6 +187,18 @@ public class DefaultCxfRsBinding implements CxfRsBinding, 
HeaderFilterStrategyAw
         return answer;
     }
 
+    public Entity<Object> bindCamelMessageToRequestEntity(Object body, Message 
camelMessage, Exchange camelExchange) throws Exception  {
+        if (body == null) {
+            return null;
+        }
+        String contentType = camelMessage.getHeader(Exchange.CONTENT_TYPE, 
String.class);
+        if (contentType == null) {
+            contentType = MediaType.WILDCARD;
+        }
+        String contentEncoding = 
camelMessage.getHeader(Exchange.CONTENT_ENCODING, String.class);
+        return  Entity.entity(body, new 
Variant(MediaType.valueOf(contentType), Locale.US, contentEncoding));
+    }
+
     /**
      *  By default, we just return the response object. 
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerSessionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerSessionTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerSessionTest.java
new file mode 100644
index 0000000..4546aa0
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerSessionTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.cxf.CXFTestSupport;
+import org.apache.camel.component.cxf.common.message.CxfConstants;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class CxfRsAsyncProducerSessionTest extends CamelSpringTestSupport {
+    private static int port1 = CXFTestSupport.getPort1();
+    private static int port2 = 
CXFTestSupport.getPort("CxfRsProducerSessionTest.jetty");
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    public int getPort1() {
+        return port1;
+    }
+
+    public int getPort2() {
+        return port2;
+    }
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml");
+    }
+
+    @Test
+    public void testNoSessionProxy() {
+        String response = sendMessage("direct://proxy", "World", 
Boolean.FALSE).getOut().getBody(String.class);
+        assertEquals("New New World", response);
+        response = sendMessage("direct://proxy", "World", 
Boolean.FALSE).getOut().getBody(String.class);
+        assertEquals("New New World", response);
+    }
+
+    @Test
+    public void testExchangeSessionProxy() {
+        String response = sendMessage("direct://proxyexchange", "World", 
Boolean.FALSE).getOut().getBody(String.class);
+        assertEquals("Old New World", response);
+        response = sendMessage("direct://proxyexchange", "World", 
Boolean.FALSE).getOut().getBody(String.class);
+        assertEquals("Old New World", response);
+    }
+
+    @Test
+    public void testInstanceSession() {
+        String response = sendMessage("direct://proxyinstance", "World", 
Boolean.FALSE).getOut().getBody(String.class);
+        assertEquals("Old New World", response);
+        response = sendMessage("direct://proxyinstance", "World", 
Boolean.FALSE).getOut().getBody(String.class);
+        assertEquals("Old Old World", response);
+        // we do the instance tests for proxy and http in one test because 
order
+        // matters here
+        response = sendMessage("direct://httpinstance", "World", 
Boolean.TRUE).getOut().getBody(String.class);
+        assertEquals("Old Old World", response);
+    }
+
+    @Test
+    public void testNoSessionHttp() {
+        String response = sendMessage("direct://http", "World", 
Boolean.TRUE).getOut().getBody(String.class);
+        assertEquals("New New World", response);
+        response = sendMessage("direct://http", "World", 
Boolean.TRUE).getOut().getBody(String.class);
+        assertEquals("New New World", response);
+    }
+
+    @Test
+    public void testExchangeSessionHttp() {
+        String response = sendMessage("direct://httpexchange", "World", 
Boolean.TRUE).getOut().getBody(String.class);
+        assertEquals("Old New World", response);
+        response = sendMessage("direct://httpexchange", "World", 
Boolean.TRUE).getOut().getBody(String.class);
+        assertEquals("Old New World", response);
+    }
+
+    private Exchange sendMessage(String endpoint, String body, Boolean 
httpApi) {
+        Exchange exchange = template.send(endpoint, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.setPattern(ExchangePattern.InOut);
+                Message inMessage = exchange.getIn();
+                inMessage.setHeader(CxfConstants.OPERATION_NAME, "echo");
+                inMessage.setHeader(Exchange.HTTP_METHOD, "POST");
+                inMessage.setHeader(Exchange.HTTP_PATH, "/echoservice/echo");
+                inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, 
httpApi);
+                inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, 
String.class);
+                inMessage.setHeader(Exchange.ACCEPT_CONTENT_TYPE, 
"application/json");
+                inMessage.setBody(body);
+            }
+        });
+        return exchange;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerTest.java
new file mode 100644
index 0000000..b7ee1c4
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerTest.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+import java.io.InputStream;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.component.cxf.CXFTestSupport;
+import org.apache.camel.component.cxf.CxfOperationException;
+import org.apache.camel.component.cxf.common.message.CxfConstants;
+import org.apache.camel.component.cxf.jaxrs.testbean.Customer;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.apache.camel.util.CastUtils;
+import org.apache.cxf.Bus;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.feature.Feature;
+import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+
+public class CxfRsAsyncProducerTest extends CamelSpringTestSupport {
+    private static int port1 = CXFTestSupport.getPort1(); 
+    private static int port2 = 
CXFTestSupport.getPort("CxfRsAsyncProducerTest.jetty"); 
+    
+    public static class JettyProcessor implements Processor {
+        public void process(Exchange exchange) throws Exception {
+            // check the query
+            Message inMessage = exchange.getIn();
+            exchange.getOut().setBody(inMessage.getHeader(Exchange.HTTP_QUERY, 
String.class));
+        }
+    }
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    public int getPort1() {
+        return port1;
+    }
+    public int getPort2() {
+        return port2;
+    }
+    
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {     
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml");
+    }
+
+    @Test
+    public void testGetCustomerWithClientProxyAPI() {
+        // START SNIPPET: ProxyExample
+        Exchange exchange = template.send("direct://proxy", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.setPattern(ExchangePattern.InOut);
+                Message inMessage = exchange.getIn();
+                // set the operation name 
+                inMessage.setHeader(CxfConstants.OPERATION_NAME, 
"getCustomer");
+                // using the proxy client API
+                inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, 
Boolean.FALSE);
+                // set a customer header
+                inMessage.setHeader("key", "value");
+                // setup the accept content type
+                inMessage.setHeader(Exchange.ACCEPT_CONTENT_TYPE, 
"application/json");
+                // set the parameters , if you just have one parameter 
+                // camel will put this object into an Object[] itself
+                inMessage.setBody("123");
+            }
+        });
+
+        // get the response message 
+        Customer response = (Customer) exchange.getOut().getBody();
+
+        assertNotNull("The response should not be null ", response);
+        assertEquals("Get a wrong customer id ", 
String.valueOf(response.getId()), "123");
+        assertEquals("Get a wrong customer name", response.getName(), "John");
+        assertEquals("Get a wrong response code", 200, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+        assertEquals("Get a wrong header value", "value", 
exchange.getOut().getHeader("key"));
+        // END SNIPPET: ProxyExample     
+    }
+    
+    @Test
+    public void testGetCustomersWithClientProxyAPI() {
+        Exchange exchange = template.send("direct://proxy", newExchange -> {
+            newExchange.setPattern(ExchangePattern.InOut);
+            Message inMessage = newExchange.getIn();
+            // set the operation name 
+            inMessage.setHeader(CxfConstants.OPERATION_NAME, "getCustomers");
+            // using the proxy client API
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, 
Boolean.FALSE);
+            // camel will put this object into an Object[] itself
+            inMessage.setBody(null);
+        });
+     
+        // get the response message 
+        List<Customer> response = CastUtils.cast((List<?>) 
exchange.getOut().getBody());
+        
+        assertNotNull("The response should not be null ", response);
+        assertTrue("Dan is missing!", response.contains(new Customer(113, 
"Dan")));
+        assertTrue("John is missing!", response.contains(new Customer(123, 
"John")));
+        assertEquals("Get a wrong response code", 200, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+    }
+
+    @Test
+    public void testGetCustomersWithHttpCentralClientAPI() {
+        Exchange exchange = template.send("direct://proxy", newExchange -> {
+            newExchange.setPattern(ExchangePattern.InOut);
+            Message inMessage = newExchange.getIn();
+            // set the Http method
+            inMessage.setHeader(Exchange.HTTP_METHOD, "GET");
+            // set the relative path 
+            inMessage.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customers/");
+            // using the proxy client API
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, 
Boolean.TRUE);
+            // set the headers 
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, 
List.class);
+            
inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_GENERIC_TYPE, new 
ParameterizedCollectionType(Customer.class));
+            // camel will put this object into an Object[] itself
+            inMessage.setBody(null);
+        });
+
+        // get the response message 
+        List<Customer> response = CastUtils.cast((List<?>) 
exchange.getOut().getBody());
+
+        assertNotNull("The response should not be null ", response);
+        assertTrue("Dan is missing!", response.contains(new Customer(113, 
"Dan")));
+        assertTrue("John is missing!", response.contains(new Customer(123, 
"John")));
+        assertEquals("Get a wrong response code", 200, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+    }
+    
+    @Test
+    public void testGetCustomerWithHttpCentralClientAPI() {
+        Exchange exchange = template.send("direct://http", newExchange -> {
+            newExchange.setPattern(ExchangePattern.InOut);
+            Message inMessage = newExchange.getIn();
+            // using the http central client API
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, 
Boolean.TRUE);
+            // set the Http method
+            inMessage.setHeader(Exchange.HTTP_METHOD, "GET");
+            // set the relative path
+            inMessage.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customers/123");                
+            // Specify the response class , cxfrs will use InputStream as the 
response object type 
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, 
Customer.class);
+            // set a customer header
+            inMessage.setHeader("key", "value");
+            // since we use the Get method, so we don't need to set the 
message body
+            inMessage.setBody(null);                
+        });
+
+     
+        // get the response message 
+        Customer response = (Customer) exchange.getOut().getBody();
+        
+        assertNotNull("The response should not be null ", response);
+        assertEquals("Get a wrong customer id ", 
String.valueOf(response.getId()), "123");
+        assertEquals("Get a wrong customer name", response.getName(), "John");
+        assertEquals("Get a wrong response code", 200, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+        assertEquals("Get a wrong header value", "value", 
exchange.getOut().getHeader("key"));
+    }
+    
+    @Test
+    public void testSuppressGetCustomerExceptionWithCxfRsEndpoint() {
+        Exchange exchange 
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + 
"/?httpClientAPI=true&throwExceptionOnFailure=false", newExchange -> {
+                newExchange.setPattern(ExchangePattern.InOut);
+                Message message = newExchange.getIn();
+                // set the Http method
+                message.setHeader(Exchange.HTTP_METHOD, "PUT");
+                // set the relative path
+                message.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customers");
+                // we just setup the customer with a wrong id
+                Customer customer = new Customer();
+                customer.setId(222);
+                customer.setName("user");
+                message.setBody(customer);                
+            });
+ 
+        // we should get the exception here 
+        assertNull("Don't expect the exception here", exchange.getException());
+        Message result = exchange.getOut();
+        assertEquals("Get a wrong http status code.", 
result.getHeader(Exchange.HTTP_RESPONSE_CODE), 406);
+        
+        
+    }
+    
+    @Test
+    public void testGetCustomerExceptionWithCxfRsEndpoint() {
+        Exchange exchange 
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/?httpClientAPI=true", newExchange -> {
+                newExchange.setPattern(ExchangePattern.InOut);
+                Message message = newExchange.getIn();
+                // set the Http method
+                message.setHeader(Exchange.HTTP_METHOD, "PUT");
+                // set the relative path
+                message.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customers");
+                // we just setup the customer with a wrong id
+                Customer customer = new Customer();
+                customer.setId(222);
+                customer.setName("user");
+                message.setBody(customer);                
+            });
+ 
+        // we should get the exception here 
+        assertNotNull("Expect the exception here", exchange.getException());
+        CxfOperationException exception = 
(CxfOperationException)exchange.getException();
+        
+        assertEquals("Get a wrong response body", "Cannot find the customer!", 
exception.getResponseBody());
+        
+    }
+    
+    @Test
+    public void testGetCustomerWithCxfRsEndpoint() {
+        Exchange exchange 
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/?httpClientAPI=true", newExchange -> {
+                newExchange.setPattern(ExchangePattern.InOut);
+                Message inMessage = newExchange.getIn();
+                // set the Http method
+                inMessage.setHeader(Exchange.HTTP_METHOD, "GET");
+                // set the relative path
+                inMessage.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customers/123");                
+                // Specify the response class , cxfrs will use InputStream as 
the response object type 
+                inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, 
Customer.class);
+                // since we use the Get method, so we don't need to set the 
message body
+                inMessage.setBody(null);                
+            });
+
+        // get the response message 
+        Customer response = (Customer) exchange.getOut().getBody();
+        assertNotNull("The response should not be null ", response);
+        assertEquals("Get a wrong customer id ", 
String.valueOf(response.getId()), "123");
+        assertEquals("Get a wrong customer name", response.getName(), "John");
+        assertEquals("Get a wrong response code", 200, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+    }
+
+    @Test
+    public void testGetCustomerWithVariableReplacementAndCxfRsEndpoint() {
+        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", 
newExchange -> {
+            newExchange.setPattern(ExchangePattern.InOut);
+            Message inMessage = newExchange.getIn();
+            // set the Http method
+            inMessage.setHeader(Exchange.HTTP_METHOD, "GET");
+            // set the relative path
+            inMessage.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customers/{customerId}");
+            // Set variables for replacement
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, new 
String[] {"123"});
+            // Specify the response class , cxfrs will use InputStream as the 
response object type
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, 
Customer.class);
+            // since we use the Get method, so we don't need to set the 
message body
+            inMessage.setBody(null);
+        });
+
+        // get the response message
+        Customer response = (Customer) exchange.getOut().getBody();
+        assertNotNull("The response should not be null ", response);
+        assertEquals("Get a wrong customer id ", 
String.valueOf(response.getId()), "123");
+        assertEquals("Get a wrong customer name", response.getName(), "John");
+        assertEquals("Get a wrong response code", 200, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+    }
+    
+    @Test
+    public void testAddCustomerUniqueResponseCodeWithHttpClientAPI() {
+        Exchange exchange 
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "?httpClientAPI=true", newExchange -> {
+                newExchange.setPattern(ExchangePattern.InOut);
+                Message inMessage = newExchange.getIn();
+                // set the Http method
+                inMessage.setHeader(Exchange.HTTP_METHOD, "POST");
+                // set the relative path
+                inMessage.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customersUniqueResponseCode");                
+                // create a new customer object
+                Customer customer = new Customer();
+                customer.setId(9999);
+                customer.setName("HttpClient");
+                inMessage.setBody(customer);                
+            });
+
+        // get the response message 
+        Response response = (Response) exchange.getOut().getBody();
+        assertNotNull("The response should not be null ", response);
+        assertNotNull("The response entity should not be null", 
response.getEntity());
+        // check the response code
+        assertEquals("Get a wrong response code", 201, response.getStatus());
+        // check the response code from message header
+        assertEquals("Get a wrong response code", 201, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+    }
+
+    @Test
+    public void testAddCustomerUniqueResponseCodeWithProxyAPI() {
+        Exchange exchange = template.send("direct://proxy", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.setPattern(ExchangePattern.InOut);
+                Message inMessage = exchange.getIn();
+                // set the operation name 
+                inMessage.setHeader(CxfConstants.OPERATION_NAME, 
"addCustomerUniqueResponseCode");
+                // using the proxy client API
+                inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, 
Boolean.FALSE);
+                // set the parameters , if you just have one parameter 
+                // camel will put this object into an Object[] itself
+                Customer customer = new Customer();
+                customer.setId(8888);
+                customer.setName("ProxyAPI");
+                inMessage.setBody(customer);
+            }
+        });
+
+        // get the response message 
+        Response response = (Response) exchange.getOut().getBody();
+        assertNotNull("The response should not be null ", response);
+        assertNotNull("The response entity should not be null", 
response.getEntity());
+        // check the response code
+        assertEquals("Get a wrong response code", 201, response.getStatus());
+        // check the response code from message header
+        assertEquals("Get a wrong response code", 201, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+    }
+
+    @Test
+    public void testAddCustomerUniqueResponseCode() {
+        Exchange exchange
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "?httpClientAPI=true", new Processor() {
+                public void process(Exchange exchange) throws Exception {
+                    exchange.setPattern(ExchangePattern.InOut);
+                    Message inMessage = exchange.getIn();
+                    // set the Http method
+                    inMessage.setHeader(Exchange.HTTP_METHOD, "POST");
+                    // set the relative path
+                    inMessage.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customersUniqueResponseCode");
+                    // put the response's entity into out message body
+                    
inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Customer.class);
+                    // create a new customer object
+                    Customer customer = new Customer();
+                    customer.setId(8888);
+                    customer.setName("Willem");
+                    inMessage.setBody(customer);
+                }
+            });
+
+        // get the response message 
+        Customer response = (Customer) exchange.getOut().getBody();
+        assertNotNull("The response should not be null ", response);
+        assertTrue("Get a wrong customer id ", response.getId() != 8888);
+        assertEquals("Get a wrong customer name", response.getName(), 
"Willem");
+        assertEquals("Get a wrong response code", 201, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+    }
+    
+    @Test
+    public void testProducerWithQueryParameters() {
+        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort2() + "/" + getClass().getSimpleName() + 
"/testQuery?httpClientAPI=true&q1=12&q2=13", newExchange -> {
+            newExchange.setPattern(ExchangePattern.InOut);
+            Message inMessage = newExchange.getIn();
+            // set the Http method
+            inMessage.setHeader(Exchange.HTTP_METHOD, "GET");
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, 
InputStream.class);
+            inMessage.setBody(null);                
+        });
+
+        // get the response message 
+        String response = exchange.getOut().getBody(String.class);
+        assertNotNull("The response should not be null ", response);
+        assertEquals("The response value is wrong", "q1=12&q2=13", response);
+    }
+    
+    @Test
+    public void testProducerWithQueryParametersHeader() {
+        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort2() + "/" + getClass().getSimpleName() + 
"/testQuery?httpClientAPI=true&q1=12&q2=13", newExchange -> {
+            newExchange.setPattern(ExchangePattern.InOut);
+            Message inMessage = newExchange.getIn();
+            // set the Http method
+            inMessage.setHeader(Exchange.HTTP_METHOD, "GET");
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, 
InputStream.class);
+            // override the parameter setting from URI
+            // START SNIPPET: QueryMapExample
+            Map<String, String> queryMap = new LinkedHashMap<>();              
      
+            queryMap.put("q1", "new");
+            queryMap.put("q2", "world");                    
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_QUERY_MAP, queryMap);
+            // END SNIPPET: QueryMapExample 
+            inMessage.setBody(null);                
+        });
+
+        // get the response message 
+        String response = exchange.getOut().getBody(String.class);
+        assertNotNull("The response should not be null ", response);
+        assertEquals("The response value is wrong", "q1=new&q2=world", 
response);
+    }
+    
+    
+
+    @Test    
+    public void testRestServerDirectlyGetCustomer() {
+        // we cannot convert directly to Customer as we need camel-jaxb
+        String response = 
template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/customerservice/customers/123",
+                null, Exchange.HTTP_METHOD, "GET", String.class);
+        
+        assertNotNull("The response should not be null ", response);
+    }
+
+    @Test
+    public void testRestServerDirectlyAddCustomer() {
+        Customer input = new Customer();
+        input.setName("Donald Duck");
+
+        // we cannot convert directly to Customer as we need camel-jaxb
+        String response = 
template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/customerservice/customers",
+                input, Exchange.HTTP_METHOD, "POST", String.class);
+
+        assertNotNull(response);
+        assertTrue(response.endsWith("<name>Donald Duck</name></Customer>"));
+    }
+    
+    static class TestFeature implements Feature {
+        boolean initialized;
+        @Override
+        public void initialize(InterceptorProvider interceptorProvider, Bus 
bus) {
+            initialized = true;
+        }
+        @Override
+        public void initialize(Client client, Bus bus) {
+            //Do nothing
+        }
+        @Override
+        public void initialize(Server server, Bus bus) {
+            //Do nothing
+        }
+        @Override
+        public void initialize(Bus bus) {
+            //Do nothing
+        }
+    }
+
+    @Test
+    public void testProducerWithFeature() {
+        TestFeature feature = 
context().getRegistry().lookupByNameAndType("testFeature", TestFeature.class);
+        
+        template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + 
"/" + getClass().getSimpleName() + 
"/customerservice/customers/123?features=#myFeatures",
+                null, Exchange.HTTP_METHOD, "GET", String.class);
+
+        assertTrue("The feature should be initialized", feature.initialized);
+    }
+
+    @Test
+    public void testProducer422Response() {
+        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", 
newExchange -> {
+            newExchange.setPattern(ExchangePattern.InOut);
+            Message message = newExchange.getIn();
+            // Try to create a new Customer with an invalid name
+            message.setHeader(Exchange.HTTP_METHOD, "POST");
+            message.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customers");
+            Customer customer = new Customer();
+            customer.setId(8888);
+            customer.setName("");  // will trigger a 422 response (a common 
REST server validation response code)
+            message.setBody(customer);
+        });
+
+        assertNotNull("Expect the exception here", exchange.getException());
+        assertThat("Exception should be a CxfOperationException", 
exchange.getException(), instanceOf(CxfOperationException.class));
+
+        CxfOperationException cxfOperationException = 
CxfOperationException.class.cast(exchange.getException());
+
+        assertThat("CXF operation exception has correct response code", 
cxfOperationException.getStatusCode(), is(422));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncRelayTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncRelayTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncRelayTest.java
new file mode 100644
index 0000000..11618c9
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncRelayTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jws.WebMethod;
+import javax.jws.WebService;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.CXFTestSupport;
+import org.apache.camel.spring.Main;
+import org.apache.camel.test.junit4.TestSupport;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.jaxrs.client.JAXRSClientFactory;
+import org.apache.cxf.jaxrs.ext.multipart.Multipart;
+import org.junit.Test;
+
+public class CxfRsAsyncRelayTest extends TestSupport {
+    private static int port6 = CXFTestSupport.getPort6();
+    /**
+     * A sample service "interface" (technically, it is a class since we will
+     * use proxy-client. That interface exposes three methods over-loading each
+     * other : we are testing the appropriate one will be chosen at runtime.
+     *
+     */
+    @WebService
+    @Path("/rootpath")
+    @Consumes("multipart/form-data")
+    @Produces("application/xml")
+    public static class UploadService {
+        @WebMethod
+        @POST
+        @Path("/path1")
+        @Consumes("multipart/form-data")
+        public void upload(@Multipart(value = "content", type = 
"application/octet-stream") java.lang.Number content,
+                           @Multipart(value = "name", type = "text/plain") 
String name) {
+        }
+
+        @WebMethod
+        @GET
+        @Path("/path2")
+        @Consumes("text/plain")
+        private void upload() {
+        }
+
+        @WebMethod
+        @POST
+        @Path("/path3")
+        @Consumes("multipart/form-data")
+        public void upload(@Multipart(value = "content", type = 
"application/octet-stream") InputStream content,
+                           @Multipart(value = "name", type = "text/plain") 
String name) {
+        }
+
+    }
+
+    private static final String SAMPLE_CONTENT_PATH = 
"/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncRelay.xml";
+    private static final String SAMPLE_NAME = "CxfRsSpringAsyncRelay.xml";
+    private static final CountDownLatch LATCH = new CountDownLatch(1);
+    private static String content;
+    private static String name;
+
+    /**
+     * That test builds a route chaining two cxfrs endpoints. It shows a 
request
+     * sent to the first one will be correctly transferred and consumed by the
+     * other one.
+     */
+    @Test
+    public void testJaxrsAsyncRelayRoute() throws Exception {
+        final Main main = new Main();
+        try {
+            
main.setApplicationContextUri("org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncRelay.xml");
+            main.start();
+            Thread t = new Thread(new Runnable() {
+                /**
+                 * Sends a request to the first endpoint in the route
+                 */
+                public void run() {
+                    try {
+                        JAXRSClientFactory.create("http://localhost:"; + port6 
+ "/CxfRsAsyncRelayTest/rest", UploadService.class)
+                            
.upload(CamelRouteBuilder.class.getResourceAsStream(SAMPLE_CONTENT_PATH),
+                                SAMPLE_NAME);
+                    } catch (Exception e) {
+                        log.warn("Error uploading to http://localhost:"; + 
port6 + "/CxfRsAsyncRelayTest/rest", e);
+                    }
+                }
+            });
+            t.start();
+            LATCH.await(10, TimeUnit.SECONDS);
+            assertEquals(SAMPLE_NAME, name);
+            StringWriter writer = new StringWriter();
+            IOUtils.copyAndCloseInput(new 
InputStreamReader(CamelRouteBuilder.class
+                .getResourceAsStream(SAMPLE_CONTENT_PATH)), writer);
+            assertEquals(writer.toString(), content);
+        } finally {
+            main.stop();
+        }
+    }
+
+    /**
+     * Route builder to be used with
+     * org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncRelay.xml
+     *
+     */
+    public static class CamelRouteBuilder extends RouteBuilder {
+        @Override
+        public void configure() throws InterruptedException {
+            
from("upload1").removeHeader(Exchange.CONTENT_TYPE).to("upload2Client");
+            from("upload2").process(new Processor() {
+                @Override
+                public void process(Exchange exchange) throws Exception {
+                    // once the message arrives in the second endpoint, stores
+                    // the message components and warns results can be compared
+                    content = exchange.getIn().getHeader("content", 
String.class);
+                    name = exchange.getIn().getHeader("name", String.class);
+                    LATCH.countDown();
+                }
+            });
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java
index 48054e3..1ce871b 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.cxf.jaxrs;
 import java.lang.reflect.Method;
 import java.util.Map;
 
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MultivaluedMap;
 
 import org.apache.camel.Exchange;
@@ -109,6 +110,11 @@ public class CxfRsBindingConfigurationSelectionTest 
extends CamelTestSupport {
         }
 
         @Override
+        public Entity<Object> bindCamelMessageToRequestEntity(Object body, 
Message camelMessage, Exchange camelExchange) {
+            return null;
+        }
+
+        @Override
         public Object bindCamelMessageBodyToRequestBody(Message camelMessage, 
Exchange camelExchange) throws Exception {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java
index 039859c..4c10298 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java
@@ -173,7 +173,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
     @Test
     public void testSuppressGetCustomerExceptionWithCxfRsEndpoint() {
         Exchange exchange 
-            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + 
"/?httpClientAPI=true&throwExceptionOnFailure=false", new Processor() {
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + 
"/?httpClientAPI=true&throwExceptionOnFailure=false&synchronous=true", new 
Processor() {
                 public void process(Exchange exchange) throws Exception {
                     exchange.setPattern(ExchangePattern.InOut);
                     Message message = exchange.getIn();
@@ -200,7 +200,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
     @Test
     public void testGetCustomerExceptionWithCxfRsEndpoint() {
         Exchange exchange 
-            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/?httpClientAPI=true", new Processor() {
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/?httpClientAPI=true&synchronous=true", new 
Processor() {
                 public void process(Exchange exchange) throws Exception {
                     exchange.setPattern(ExchangePattern.InOut);
                     Message message = exchange.getIn();
@@ -280,7 +280,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
     @Test
     public void testAddCustomerUniqueResponseCodeWithHttpClientAPI() {
         Exchange exchange 
-            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "?httpClientAPI=true", new Processor() {
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "?httpClientAPI=true&synchronous=true", new 
Processor() {
                 public void process(Exchange exchange) throws Exception {
                     exchange.setPattern(ExchangePattern.InOut);
                     Message inMessage = exchange.getIn();
@@ -340,7 +340,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
     @Test
     public void testAddCustomerUniqueResponseCode() {
         Exchange exchange 
-            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "?httpClientAPI=true", new Processor() {
+            = template.send("cxfrs://http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "?httpClientAPI=true&synchronous=true", new 
Processor() {
                 public void process(Exchange exchange) throws Exception {
                     exchange.setPattern(ExchangePattern.InOut);
                     Message inMessage = exchange.getIn();
@@ -368,7 +368,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
     
     @Test
     public void testProducerWithQueryParameters() {
-        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort2() + "/" + getClass().getSimpleName() + 
"/testQuery?httpClientAPI=true&q1=12&q2=13", new Processor() {
+        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort2() + "/" + getClass().getSimpleName() + 
"/testQuery?httpClientAPI=true&q1=12&q2=13&synchronous=true", new Processor() {
                 public void process(Exchange exchange) throws Exception {
                     exchange.setPattern(ExchangePattern.InOut);
                     Message inMessage = exchange.getIn();
@@ -388,7 +388,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
     
     @Test
     public void testProducerWithQueryParametersHeader() {
-        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort2() + "/" + getClass().getSimpleName() + 
"/testQuery?httpClientAPI=true&q1=12&q2=13", new Processor() {
+        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort2() + "/" + getClass().getSimpleName() + 
"/testQuery?httpClientAPI=true&q1=12&q2=13&synchronous=true", new Processor() {
                 public void process(Exchange exchange) throws Exception {
                     exchange.setPattern(ExchangePattern.InOut);
                     Message inMessage = exchange.getIn();
@@ -418,7 +418,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
     @Test    
     public void testRestServerDirectlyGetCustomer() {
         // we cannot convert directly to Customer as we need camel-jaxb
-        String response = 
template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/customerservice/customers/123",
+        String response = 
template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/customerservice/customers/123?synchronous=true",
                 null, Exchange.HTTP_METHOD, "GET", String.class);
         
         assertNotNull("The response should not be null ", response);
@@ -430,7 +430,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
         input.setName("Donald Duck");
 
         // we cannot convert directly to Customer as we need camel-jaxb
-        String response = 
template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/customerservice/customers",
+        String response = 
template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + "/" + 
getClass().getSimpleName() + "/customerservice/customers?synchronous=true",
                 input, Exchange.HTTP_METHOD, "POST", String.class);
 
         assertNotNull(response);
@@ -461,7 +461,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
     public void testProducerWithFeature() {
         TestFeature feature = 
context().getRegistry().lookupByNameAndType("testFeature", TestFeature.class);
         
-        template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + 
"/" + getClass().getSimpleName() + 
"/customerservice/customers/123?features=#myFeatures",
+        template.requestBodyAndHeader("cxfrs:http://localhost:"; + getPort1() + 
"/" + getClass().getSimpleName() + 
"/customerservice/customers/123?features=#myFeatures&synchronous=true",
                 null, Exchange.HTTP_METHOD, "GET", String.class);
 
         assertTrue("The feature should be initialized", feature.initialized);
@@ -469,7 +469,7 @@ public class CxfRsProducerTest extends 
CamelSpringTestSupport {
 
     @Test
     public void testProducer422Response() {
-        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", new 
Processor() {
+        Exchange exchange = template.send("cxfrs://http://localhost:"; + 
getPort1() + "/" + getClass().getSimpleName() + 
"/?httpClientAPI=true&synchronous=true", new Processor() {
             public void process(Exchange exchange) throws Exception {
                 exchange.setPattern(ExchangePattern.InOut);
                 Message message = exchange.getIn();

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsSslAsyncProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsSslAsyncProducerTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsSslAsyncProducerTest.java
new file mode 100644
index 0000000..042e0df
--- /dev/null
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsSslAsyncProducerTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.cxf.CXFTestSupport;
+import org.apache.camel.component.cxf.common.message.CxfConstants;
+import org.apache.camel.component.cxf.jaxrs.testbean.Customer;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import static org.hamcrest.core.Is.is;
+
+public class CxfRsSslAsyncProducerTest extends CamelSpringTestSupport {
+    private static int port1 = CXFTestSupport.getSslPort();
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    public int getPort1() {
+        return port1;
+    }
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {     
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/component/cxf/jaxrs/CxfRsSpringSslAsyncProducer.xml");
+    }
+    
+    @Test
+    public void testCorrectTrustStore() {
+        Exchange exchange = template.send("direct://trust", new MyProcessor());
+     
+        // get the response message 
+        Customer response = (Customer) exchange.getOut().getBody();
+        
+        assertNotNull("The response should not be null ", response);
+        assertEquals("Get a wrong customer id ", 
String.valueOf(response.getId()), "123");
+        assertEquals("Get a wrong customer name", response.getName(), "John");
+        assertEquals("Get a wrong response code", 200, 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE));
+        assertEquals("Get a wrong header value", "value", 
exchange.getOut().getHeader("key"));
+    }
+
+    @Test
+    public void testNoTrustStore() {
+        Exchange exchange = template.send("direct://noTrust", new 
MyProcessor());
+        assertThat(exchange.isFailed(), is(true));
+        Exception e = exchange.getException();
+        assertThat(e.getCause().getClass().getCanonicalName(), 
is("javax.net.ssl.SSLHandshakeException"));
+    }
+
+    @Test
+    public void testWrongTrustStore() {
+        Exchange exchange = template.send("direct://wrongTrust", new 
MyProcessor());
+        assertThat(exchange.isFailed(), is(true));
+        Exception e = exchange.getException();
+        assertThat(e.getCause().getClass().getCanonicalName(), 
is("javax.net.ssl.SSLHandshakeException"));
+    }
+
+    private class MyProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            exchange.setPattern(ExchangePattern.InOut);
+            Message inMessage = exchange.getIn();
+            // set the Http method
+            inMessage.setHeader(Exchange.HTTP_METHOD, "GET");
+            // set the relative path
+            inMessage.setHeader(Exchange.HTTP_PATH, 
"/customerservice/customers/123");
+            // Specify the response class , cxfrs will use InputStream as the 
response object type
+            inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, 
Customer.class);
+            // set a customer header
+            inMessage.setHeader("key", "value");
+            // since we use the Get method, so we don't need to set the 
message body
+            inMessage.setBody(null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml
 
b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml
new file mode 100644
index 0000000..83b97ae
--- /dev/null
+++ 
b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:cxf="http://camel.apache.org/schema/cxf";
+       xmlns:jaxrs="http://cxf.apache.org/jaxrs";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd
+       http://camel.apache.org/schema/cxf 
http://camel.apache.org/schema/cxf/camel-cxf.xsd
+       http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd
+       http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <jaxrs:server id="restService"
+                       
address="http://localhost:${CXFTestSupport.port1}/CxfRsAsyncProducerTest/"; 
+                       staticSubresourceResolution="true">
+    <jaxrs:serviceBeans>
+      <ref bean="customerService"/>
+    </jaxrs:serviceBeans>       
+  </jaxrs:server>
+  
+  <bean id="customerService" 
class="org.apache.camel.component.cxf.jaxrs.testbean.CustomerService" />
+
+  <cxf:rsClient id="rsClientProxy" 
address="http://localhost:${CXFTestSupport.port1}/CxfRsAsyncProducerTest/";
+    
serviceClass="org.apache.camel.component.cxf.jaxrs.testbean.CustomerService"
+    loggingFeatureEnabled="true" />
+   
+  <cxf:rsClient id="rsClientHttp" 
address="http://localhost:${CXFTestSupport.port1}/CxfRsAsyncProducerTest/"/>
+
+  <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";>
+    <endpoint id="fromEndpoint" 
uri="jetty://http://localhost:${CxfRsAsyncProducerTest.jetty}/CxfRsAsyncProducerTest/testQuery"/>
+    <route>
+       <from uri="direct://proxy"/>
+       <to uri="cxfrs://bean://rsClientProxy"/>
+    </route>
+    <route>
+       <from uri="direct://http"/>
+       <to uri="cxfrs://bean://rsClientHttp"/>
+    </route>
+    <route>
+       <from ref="fromEndpoint"/>
+       <process ref="myProcessor" />
+    </route>
+  </camelContext>
+
+  <bean id="myProcessor" 
class="org.apache.camel.component.cxf.jaxrs.CxfRsAsyncProducerTest$JettyProcessor"/>
+  <bean id ="testFeature" 
class="org.apache.camel.component.cxf.jaxrs.CxfRsAsyncProducerTest$TestFeature" 
/>
+  
+  <util:list id="myFeatures" >
+     <ref bean="testFeature"/>
+  </util:list>
+  
+</beans>

http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml
 
b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml
new file mode 100644
index 0000000..9f27ffe
--- /dev/null
+++ 
b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:cxf="http://camel.apache.org/schema/cxf";
+       xmlns:jaxrs="http://cxf.apache.org/jaxrs";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd
+       http://camel.apache.org/schema/cxf 
http://camel.apache.org/schema/cxf/camel-cxf.xsd
+       http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd
+       http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+    <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+    <jaxrs:server id="restService"
+                  
address="http://127.0.0.1:${CXFTestSupport.port1}/CxfRsProducerSessionTest/";
+                  staticSubresourceResolution="true">
+        <jaxrs:serviceBeans>
+            <ref bean="echoService"/>
+        </jaxrs:serviceBeans>
+    </jaxrs:server>
+
+    <bean id="echoService" 
class="org.apache.camel.component.cxf.jaxrs.testbean.EchoService" />
+
+    <cxf:rsClient id="rsClientProxy" 
address="http://127.0.0.1:${CXFTestSupport.port1}/CxfRsProducerSessionTest/";
+                  
serviceClass="org.apache.camel.component.cxf.jaxrs.testbean.EchoService"
+                  loggingFeatureEnabled="true" />
+
+    <cxf:rsClient id="rsClientHttp" 
address="http://127.0.0.1:${CXFTestSupport.port1}/CxfRsProducerSessionTest/"/>
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";>
+        <endpoint id="fromEndpoint" 
uri="jetty://http://127.0.0.1:${CxfRsProducerSessionTest.jetty}/CxfRsProducerSessionTest/echoservice"/>
+        <route>
+            <from uri="direct://proxy"/>
+            <to uri="cxfrs://bean://rsClientProxy"/>
+            <convertBodyTo type="java.lang.String"/>
+            <to uri="cxfrs://bean://rsClientProxy"/>
+        </route>
+        <route>
+            <from uri="direct://proxyinstance"/>
+            <to 
uri="cxfrs://bean://rsClientProxy?cookieHandler=#instanceCookieHandler"/>
+            <convertBodyTo type="java.lang.String"/>
+            <to 
uri="cxfrs://bean://rsClientProxy?cookieHandler=#instanceCookieHandler"/>
+        </route>
+        <route>
+            <from uri="direct://proxyexchange"/>
+            <to 
uri="cxfrs://bean://rsClientProxy?cookieHandler=#exchangeCookieHandler"/>
+            <convertBodyTo type="java.lang.String"/>
+            <to 
uri="cxfrs://bean://rsClientProxy?cookieHandler=#exchangeCookieHandler"/>
+        </route>
+        <route>
+            <from uri="direct://http"/>
+            <to uri="cxfrs://bean://rsClientHttp"/>
+            <convertBodyTo type="java.lang.String"/>
+            <to uri="cxfrs://bean://rsClientHttp"/>
+        </route>
+        <route>
+            <from uri="direct://httpinstance"/>
+            <to 
uri="cxfrs://bean://rsClientHttp?cookieHandler=#instanceCookieHandler"/>
+            <convertBodyTo type="java.lang.String"/>
+            <to 
uri="cxfrs://bean://rsClientHttp?cookieHandler=#instanceCookieHandler"/>
+        </route>
+        <route>
+            <from uri="direct://httpexchange"/>
+            <to 
uri="cxfrs://bean://rsClientHttp?cookieHandler=#exchangeCookieHandler"/>
+            <convertBodyTo type="java.lang.String"/>
+            <to 
uri="cxfrs://bean://rsClientHttp?cookieHandler=#exchangeCookieHandler"/>
+        </route>
+    </camelContext>
+
+    <bean id="instanceCookieHandler" 
class="org.apache.camel.http.common.cookie.InstanceCookieHandler"/>
+    <bean id="exchangeCookieHandler" 
class="org.apache.camel.http.common.cookie.ExchangeCookieHandler"/>
+</beans>

Reply via email to