CAMEL-10164: Add support for binding in rest to

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/737b49ca
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/737b49ca
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/737b49ca

Branch: refs/heads/master
Commit: 737b49ca80147fb7956e1188ae74b26ea8782eda
Parents: e097a5d
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Sep 5 15:14:34 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Sep 5 16:26:13 2016 +0200

----------------------------------------------------------------------
 .../camel/component/rest/RestComponent.java     |   1 -
 .../camel/component/rest/RestEndpoint.java      |   4 +-
 .../camel/component/rest/RestProducer.java      | 155 ++++++-
 .../rest/RestProducerBindingProcessor.java      | 356 ++++++++++++++
 .../camel/model/rest/RestBindingDefinition.java |   6 +-
 .../processor/binding/RestBindingProcessor.java | 460 -------------------
 .../binding/RestConsumerBindingProcessor.java   | 460 +++++++++++++++++++
 .../JettyRestProducerPojoInOutTest.java         | 107 +++++
 .../src/test/resources/log4j2.properties        |   2 +-
 9 files changed, 1080 insertions(+), 471 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/camel-core/src/main/java/org/apache/camel/component/rest/RestComponent.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/rest/RestComponent.java 
b/camel-core/src/main/java/org/apache/camel/component/rest/RestComponent.java
index 4fb3d4e..5384b5f 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/rest/RestComponent.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/rest/RestComponent.java
@@ -22,7 +22,6 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.URISupport;
 
 /**
  * Rest component.

http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/camel-core/src/main/java/org/apache/camel/component/rest/RestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/rest/RestEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/component/rest/RestEndpoint.java
index 671e27f..04b4ced 100644
--- a/camel-core/src/main/java/org/apache/camel/component/rest/RestEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/rest/RestEndpoint.java
@@ -29,7 +29,6 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.RestApiProcessorFactory;
 import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.spi.RestConsumerFactory;
 import org.apache.camel.spi.RestProducerFactory;
@@ -326,7 +325,8 @@ public class RestEndpoint extends DefaultEndpoint {
             } else {
                 producer = factory.createProducer(getCamelContext(), host, 
method, path, uriTemplate, queryParameters, consumes, produces, parameters);
             }
-            return new RestProducer(this, producer);
+            RestConfiguration config = 
getCamelContext().getRestConfiguration(cname, true);
+            return new RestProducer(this, producer, config);
         } else {
             throw new IllegalStateException("Cannot find RestProducerFactory 
in Registry or as a Component to use");
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/camel-core/src/main/java/org/apache/camel/component/rest/RestProducer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/rest/RestProducer.java 
b/camel-core/src/main/java/org/apache/camel/component/rest/RestProducer.java
index 9508ba5..97976d6 100644
--- a/camel-core/src/main/java/org/apache/camel/component/rest/RestProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/rest/RestProducer.java
@@ -17,16 +17,21 @@
 package org.apache.camel.component.rest;
 
 import java.net.URLDecoder;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.tools.apt.helper.CollectionStringBuffer;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
+import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.URISupport;
@@ -36,13 +41,21 @@ import org.apache.camel.util.URISupport;
  */
 public class RestProducer extends DefaultAsyncProducer {
 
+    private final CamelContext camelContext;
+    private final RestConfiguration configuration;
+    private String bindingMode;
+    private Boolean skipBindingOnErrorCode;
+
     // the producer of the Camel component that is used as the HTTP client to 
call the REST service
     private AsyncProcessor producer;
+    private AsyncProcessor binding;
 
     private boolean prepareUriTemplate = true;
 
-    public RestProducer(Endpoint endpoint, Producer producer) {
+    public RestProducer(Endpoint endpoint, Producer producer, 
RestConfiguration configuration) {
         super(endpoint);
+        this.camelContext = endpoint.getCamelContext();
+        this.configuration = configuration;
         this.producer = AsyncProcessorConverterHelper.convert(producer);
     }
 
@@ -51,9 +64,18 @@ public class RestProducer extends DefaultAsyncProducer {
         // TODO: request bind to consumes context-type
         // TODO: response bind to content-type returned in response
         // TODO: binding
+        // TODO: binding get type/outType from api-doc if possible
+        // TODO: binding reverse only enabled if outType configured
+        // TODO move consumer binding processor to this pacakge so they are 
both the same place
+
         try {
             prepareExchange(exchange);
-            return producer.process(exchange, callback);
+            if (binding != null) {
+                return binding.process(exchange, callback);
+            } else {
+                // no binding in use call the producer directly
+                return producer.process(exchange, callback);
+            }
         } catch (Throwable e) {
             exchange.setException(e);
             callback.done(true);
@@ -78,6 +100,22 @@ public class RestProducer extends DefaultAsyncProducer {
         this.prepareUriTemplate = prepareUriTemplate;
     }
 
+    public String getBindingMode() {
+        return bindingMode;
+    }
+
+    public void setBindingMode(String bindingMode) {
+        this.bindingMode = bindingMode;
+    }
+
+    public Boolean getSkipBindingOnErrorCode() {
+        return skipBindingOnErrorCode;
+    }
+
+    public void setSkipBindingOnErrorCode(Boolean skipBindingOnErrorCode) {
+        this.skipBindingOnErrorCode = skipBindingOnErrorCode;
+    }
+
     protected void prepareExchange(Exchange exchange) throws Exception {
         boolean hasPath = false;
 
@@ -160,13 +198,122 @@ public class RestProducer extends DefaultAsyncProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        ServiceHelper.startService(producer);
+
+        // create binding processor
+        binding = createBindingProcessor();
+
+        ServiceHelper.startServices(binding, producer);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        ServiceHelper.stopService(producer);
+        ServiceHelper.stopServices(producer, binding);
+    }
+
+    protected AsyncProcessor createBindingProcessor() throws Exception {
+
+        // these options can be overridden per endpoint
+        String mode = configuration.getBindingMode().name();
+        if (bindingMode != null) {
+            mode = bindingMode;
+        }
+        boolean skip = configuration.isSkipBindingOnErrorCode();
+        if (skipBindingOnErrorCode != null) {
+            skip = skipBindingOnErrorCode;
+        }
+
+        if (mode == null || "off".equals(mode)) {
+            // binding mode is off
+            return null;
+        }
+
+        // setup json data format
+        String name = configuration.getJsonDataFormat();
+        if (name != null) {
+            // must only be a name, not refer to an existing instance
+            Object instance = camelContext.getRegistry().lookupByName(name);
+            if (instance != null) {
+                throw new IllegalArgumentException("JsonDataFormat name: " + 
name + " must not be an existing bean instance from the registry");
+            }
+        } else {
+            name = "json-jackson";
+        }
+        // this will create a new instance as the name was not already 
pre-created
+        DataFormat json = camelContext.resolveDataFormat(name);
+        DataFormat outJson = camelContext.resolveDataFormat(name);
+
+        // is json binding required?
+        if (mode.contains("json") && json == null) {
+            throw new IllegalArgumentException("JSon DataFormat " + name + " 
not found.");
+        }
+
+        if (json != null) {
+            setAdditionalConfiguration(configuration, camelContext, json, 
"json.in.");
+            setAdditionalConfiguration(configuration, camelContext, outJson, 
"json.out.");
+        }
+
+        // setup xml data format
+        name = configuration.getXmlDataFormat();
+        if (name != null) {
+            // must only be a name, not refer to an existing instance
+            Object instance = camelContext.getRegistry().lookupByName(name);
+            if (instance != null) {
+                throw new IllegalArgumentException("XmlDataFormat name: " + 
name + " must not be an existing bean instance from the registry");
+            }
+        } else {
+            name = "jaxb";
+        }
+        // this will create a new instance as the name was not already 
pre-created
+        DataFormat jaxb = camelContext.resolveDataFormat(name);
+        DataFormat outJaxb = camelContext.resolveDataFormat(name);
+
+        // is xml binding required?
+        if (mode.contains("xml") && jaxb == null) {
+            throw new IllegalArgumentException("XML DataFormat " + name + " 
not found.");
+        }
+
+        if (jaxb != null) {
+            setAdditionalConfiguration(configuration, camelContext, jaxb, 
"xml.in.");
+            setAdditionalConfiguration(configuration, camelContext, outJaxb, 
"xml.out.");
+        }
+
+        return new RestProducerBindingProcessor(producer, camelContext, json, 
jaxb, outJson, outJaxb, mode, skip);
+    }
+
+    private void setAdditionalConfiguration(RestConfiguration config, 
CamelContext context,
+                                            DataFormat dataFormat, String 
prefix) throws Exception {
+        if (config.getDataFormatProperties() != null && 
!config.getDataFormatProperties().isEmpty()) {
+            // must use a copy as otherwise the options gets removed during 
introspection setProperties
+            Map<String, Object> copy = new HashMap<String, Object>();
+
+            // filter keys on prefix
+            // - either its a known prefix and must match the prefix parameter
+            // - or its a common configuration that we should always use
+            for (Map.Entry<String, Object> entry : 
config.getDataFormatProperties().entrySet()) {
+                String key = entry.getKey();
+                String copyKey;
+                boolean known = isKeyKnownPrefix(key);
+                if (known) {
+                    // remove the prefix from the key to use
+                    copyKey = key.substring(prefix.length());
+                } else {
+                    // use the key as is
+                    copyKey = key;
+                }
+                if (!known || key.startsWith(prefix)) {
+                    copy.put(copyKey, entry.getValue());
+                }
+            }
+
+            // set reference properties first as they use # syntax that fools 
the regular properties setter
+            EndpointHelper.setReferenceProperties(context, dataFormat, copy);
+            EndpointHelper.setProperties(context, dataFormat, copy);
+        }
+    }
+
+    private boolean isKeyKnownPrefix(String key) {
+        return key.startsWith("json.in.") || key.startsWith("json.out.") || 
key.startsWith("xml.in.") || key.startsWith("xml.out.");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/camel-core/src/main/java/org/apache/camel/component/rest/RestProducerBindingProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/rest/RestProducerBindingProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/component/rest/RestProducerBindingProcessor.java
new file mode 100644
index 0000000..f17001d
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/component/rest/RestProducerBindingProcessor.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rest;
+
+import java.io.InputStream;
+import java.util.Locale;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.processor.MarshalProcessor;
+import org.apache.camel.processor.UnmarshalProcessor;
+import org.apache.camel.processor.binding.BindingException;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+public class RestProducerBindingProcessor extends DelegateAsyncProcessor {
+
+    private final CamelContext camelContext;
+    private final AsyncProcessor jsonUnmarshal;
+    private final AsyncProcessor xmlUnmarshal;
+    private final AsyncProcessor jsonMarshal;
+    private final AsyncProcessor xmlMarshal;
+    private final String bindingMode;
+    private final boolean skipBindingOnErrorCode;
+
+    public RestProducerBindingProcessor(AsyncProcessor processor, CamelContext 
camelContext,
+                                        DataFormat jsonDataFormat, DataFormat 
xmlDataFormat,
+                                        DataFormat outJsonDataFormat, 
DataFormat outXmlDataFormat,
+                                        String bindingMode, boolean 
skipBindingOnErrorCode) {
+
+        super(processor);
+
+        this.camelContext = camelContext;
+
+        if (jsonDataFormat != null) {
+            this.jsonUnmarshal = new UnmarshalProcessor(jsonDataFormat);
+        } else {
+            this.jsonUnmarshal = null;
+        }
+        if (outJsonDataFormat != null) {
+            this.jsonMarshal = new MarshalProcessor(outJsonDataFormat);
+        } else if (jsonDataFormat != null) {
+            this.jsonMarshal = new MarshalProcessor(jsonDataFormat);
+        } else {
+            this.jsonMarshal = null;
+        }
+
+        if (xmlDataFormat != null) {
+            this.xmlUnmarshal = new UnmarshalProcessor(xmlDataFormat);
+        } else {
+            this.xmlUnmarshal = null;
+        }
+        if (outXmlDataFormat != null) {
+            this.xmlMarshal = new MarshalProcessor(outXmlDataFormat);
+        } else if (xmlDataFormat != null) {
+            this.xmlMarshal = new MarshalProcessor(xmlDataFormat);
+        } else {
+            this.xmlMarshal = null;
+        }
+
+        this.bindingMode = bindingMode;
+        this.skipBindingOnErrorCode = skipBindingOnErrorCode;
+    }
+
+    @Override
+    public String toString() {
+        return "RestProducerBindingProcessor";
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // inject CamelContext before starting
+        if (jsonMarshal instanceof CamelContextAware) {
+            ((CamelContextAware) jsonMarshal).setCamelContext(camelContext);
+        }
+        if (jsonUnmarshal instanceof CamelContextAware) {
+            ((CamelContextAware) jsonUnmarshal).setCamelContext(camelContext);
+        }
+        if (xmlMarshal instanceof CamelContextAware) {
+            ((CamelContextAware) xmlMarshal).setCamelContext(camelContext);
+        }
+        if (xmlUnmarshal instanceof CamelContextAware) {
+            ((CamelContextAware) xmlUnmarshal).setCamelContext(camelContext);
+        }
+        ServiceHelper.startServices(jsonMarshal, jsonUnmarshal, xmlMarshal, 
xmlUnmarshal);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(jsonMarshal, jsonUnmarshal, xmlMarshal, 
xmlUnmarshal);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        boolean isXml = false;
+        boolean isJson = false;
+
+        // skip binding for empty/null body
+        Object body = exchange.getIn().getBody();
+        if (ObjectHelper.isEmpty(body)) {
+            // TODO: add reverse operation to call before callback
+            // okay now we can continue routing to the producer
+            return getProcessor().process(exchange, callback);
+        }
+
+        // we only need to perform binding if the message body is POJO based
+        // if its convertable to stream based then its not POJO based
+        InputStream is = 
camelContext.getTypeConverter().tryConvertTo(InputStream.class, exchange, body);
+        if (is != null) {
+            exchange.getIn().setBody(is);
+            // add reverse operation
+            exchange.addOnCompletion(new 
RestProducerBindingUnmarshalOnCompletion(jsonMarshal, xmlMarshal, false));
+            // okay now we can continue routing to the producer
+            return getProcessor().process(exchange, callback);
+        }
+
+        String contentType = ExchangeHelper.getContentType(exchange);
+        if (contentType != null) {
+            isXml = contentType.toLowerCase(Locale.ENGLISH).contains("xml");
+            isJson = contentType.toLowerCase(Locale.ENGLISH).contains("json");
+        }
+
+        // only allow xml/json if the binding mode allows that
+        isXml &= bindingMode.equals("auto") || bindingMode.contains("xml");
+        isJson &= bindingMode.equals("auto") || bindingMode.contains("json");
+
+        // if we do not yet know if its xml or json, then use the binding mode 
to know the mode
+        if (!isJson && !isXml) {
+            isXml = bindingMode.equals("auto") || bindingMode.contains("xml");
+            isJson = bindingMode.equals("auto") || 
bindingMode.contains("json");
+        }
+
+        // favor json over xml
+        if (isJson && jsonMarshal != null) {
+            // TODO: add reverse operation to call before callback
+            try {
+                jsonMarshal.process(exchange);
+            } catch (Exception e) {
+                // we failed so cannot call producer
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+            // need to prepare exchange first
+            ExchangeHelper.prepareOutToIn(exchange);
+            // add reverse operation
+            exchange.addOnCompletion(new 
RestProducerBindingUnmarshalOnCompletion(jsonMarshal, xmlMarshal, false));
+            // okay now we can continue routing to the producer
+            return getProcessor().process(exchange, callback);
+        } else if (isXml && xmlMarshal != null) {
+            // TODO: add reverse operation to call before callback
+            try {
+                xmlMarshal.process(exchange);
+            } catch (Exception e) {
+                // we failed so cannot call producer
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+            // need to prepare exchange first
+            ExchangeHelper.prepareOutToIn(exchange);
+            // add reverse operation
+            exchange.addOnCompletion(new 
RestProducerBindingUnmarshalOnCompletion(jsonMarshal, xmlMarshal, false));
+            // okay now we can continue routing to the producer
+            return getProcessor().process(exchange, callback);
+        }
+
+        // we could not bind
+        if ("off".equals(bindingMode) || bindingMode.equals("auto")) {
+            // add reverse operation
+            exchange.addOnCompletion(new 
RestProducerBindingUnmarshalOnCompletion(jsonMarshal, xmlMarshal, false));
+            // okay now we can continue routing to the producer
+            return getProcessor().process(exchange, callback);
+        } else {
+            if (bindingMode.contains("xml")) {
+                exchange.setException(new BindingException("Cannot bind to xml 
as message body is not xml compatible", exchange));
+            } else {
+                exchange.setException(new BindingException("Cannot bind to 
json as message body is not json compatible", exchange));
+            }
+            // we failed so cannot call producer
+            callback.done(true);
+            return true;
+        }
+    }
+
+    private final class RestProducerBindingUnmarshalOnCompletion extends 
SynchronizationAdapter {
+
+        private final AsyncProcessor jsonMarshal;
+        private final AsyncProcessor xmlMarshal;
+        private boolean wasXml;
+
+        private RestProducerBindingUnmarshalOnCompletion(AsyncProcessor 
jsonMarshal, AsyncProcessor xmlMarshal, boolean wasXml) {
+            this.jsonMarshal = jsonMarshal;
+            this.xmlMarshal = xmlMarshal;
+            this.wasXml = wasXml;
+        }
+
+        @Override
+        public void onDone(Exchange exchange) {
+            // only unmarshal if there was no exception
+            if (exchange.getException() != null) {
+                return;
+            }
+
+            if (skipBindingOnErrorCode) {
+                Integer code = exchange.hasOut() ? 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class) : 
exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
+                // if there is a custom http error code then skip binding
+                if (code != null && code >= 300) {
+                    return;
+                }
+            }
+
+            boolean isXml = false;
+            boolean isJson = false;
+
+            // fallback to content type if still undecided
+            if (!isXml && !isJson) {
+                String contentType = ExchangeHelper.getContentType(exchange);
+                if (contentType != null) {
+                    isXml = 
contentType.toLowerCase(Locale.ENGLISH).contains("xml");
+                    isJson = 
contentType.toLowerCase(Locale.ENGLISH).contains("json");
+                }
+            }
+            // if content type could not tell us if it was json or xml, then 
fallback to if the binding was configured with
+            // that information in the consumes
+            if (!isXml && !isJson) {
+                // TODO:
+//                isXml = produces != null && 
produces.toLowerCase(Locale.ENGLISH).contains("xml");
+//                isJson = produces != null && 
produces.toLowerCase(Locale.ENGLISH).contains("json");
+            }
+
+            // only allow xml/json if the binding mode allows that (when off 
we still want to know if its xml or json)
+            if (bindingMode != null) {
+                isXml &= bindingMode.equals("off") || 
bindingMode.equals("auto") || bindingMode.contains("xml");
+                isJson &= bindingMode.equals("off") || 
bindingMode.equals("auto") || bindingMode.contains("json");
+
+                // if we do not yet know if its xml or json, then use the 
binding mode to know the mode
+                if (!isJson && !isXml) {
+                    isXml = bindingMode.equals("auto") || 
bindingMode.contains("xml");
+                    isJson = bindingMode.equals("auto") || 
bindingMode.contains("json");
+                }
+            }
+
+            // in case we have not yet been able to determine if xml or json, 
then use the same as in the unmarshaller
+            if (isXml && isJson) {
+                isXml = wasXml;
+                isJson = !wasXml;
+            }
+
+            // need to prepare exchange first
+            ExchangeHelper.prepareOutToIn(exchange);
+
+            // ensure there is a content type header (even if binding is off)
+            // TODO:
+            // ensureHeaderContentType(produces, isXml, isJson, exchange);
+
+            if (bindingMode == null || "off".equals(bindingMode)) {
+                // binding is off, so no message body binding
+                return;
+            }
+
+            // is there any marshaller at all
+            if (jsonMarshal == null && xmlMarshal == null) {
+                return;
+            }
+
+            // is the body empty
+            if ((exchange.hasOut() && exchange.getOut().getBody() == null) || 
(!exchange.hasOut() && exchange.getIn().getBody() == null)) {
+                return;
+            }
+
+            String contentType = 
exchange.getIn().getHeader(Exchange.CONTENT_TYPE, String.class);
+            // need to lower-case so the contains check below can match if 
using upper case
+            contentType = contentType.toLowerCase(Locale.US);
+            try {
+                // favor json over xml
+                if (isJson && jsonUnmarshal != null) {
+                    // only marshal if its json content type
+                    if (contentType.contains("json")) {
+                        jsonUnmarshal.process(exchange);
+                    }
+                } else if (isXml && xmlUnmarshal != null) {
+                    // only marshal if its xml content type
+                    if (contentType.contains("xml")) {
+                        xmlUnmarshal.process(exchange);
+                    }
+                } else {
+                    // we could not bind
+                    if (bindingMode.equals("auto")) {
+                        // okay for auto we do not mind if we could not bind
+                    } else {
+                        if (bindingMode.contains("xml")) {
+                            exchange.setException(new BindingException("Cannot 
bind to xml as message body is not xml compatible", exchange));
+                        } else {
+                            exchange.setException(new BindingException("Cannot 
bind to json as message body is not json compatible", exchange));
+                        }
+                    }
+                }
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+        }
+
+        private void ensureHeaderContentType(String contentType, boolean 
isXml, boolean isJson, Exchange exchange) {
+            // favor given content type
+            if (contentType != null) {
+                String type = ExchangeHelper.getContentType(exchange);
+                if (type == null) {
+                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
contentType);
+                }
+            }
+
+            // favor json over xml
+            if (isJson) {
+                // make sure there is a content-type with json
+                String type = ExchangeHelper.getContentType(exchange);
+                if (type == null) {
+                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
"application/json");
+                }
+            } else if (isXml) {
+                // make sure there is a content-type with xml
+                String type = ExchangeHelper.getContentType(exchange);
+                if (type == null) {
+                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
"application/xml");
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "RestProducerBindingUnmarshalOnCompletion";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/camel-core/src/main/java/org/apache/camel/model/rest/RestBindingDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/rest/RestBindingDefinition.java
 
b/camel-core/src/main/java/org/apache/camel/model/rest/RestBindingDefinition.java
index a835a35..e943d2d 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/rest/RestBindingDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/rest/RestBindingDefinition.java
@@ -28,7 +28,7 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.model.NoOutputDefinition;
-import org.apache.camel.processor.binding.RestBindingProcessor;
+import org.apache.camel.processor.binding.RestConsumerBindingProcessor;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RestConfiguration;
@@ -105,7 +105,7 @@ public class RestBindingDefinition extends 
NoOutputDefinition<RestBindingDefinit
 
         if (mode == null || "off".equals(mode)) {
             // binding mode is off, so create a off mode binding processor
-            return new RestBindingProcessor(context, null, null, null, null, 
consumes, produces, mode, skip, cors, corsHeaders, defaultValues);
+            return new RestConsumerBindingProcessor(context, null, null, null, 
null, consumes, produces, mode, skip, cors, corsHeaders, defaultValues);
         }
 
         // setup json data format
@@ -200,7 +200,7 @@ public class RestBindingDefinition extends 
NoOutputDefinition<RestBindingDefinit
             setAdditionalConfiguration(config, context, outJaxb, "xml.out.");
         }
 
-        return new RestBindingProcessor(context, json, jaxb, outJson, outJaxb, 
consumes, produces, mode, skip, cors, corsHeaders, defaultValues);
+        return new RestConsumerBindingProcessor(context, json, jaxb, outJson, 
outJaxb, consumes, produces, mode, skip, cors, corsHeaders, defaultValues);
     }
 
     private void setAdditionalConfiguration(RestConfiguration config, 
CamelContext context,

http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java
deleted file mode 100644
index 5614797..0000000
--- 
a/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor.binding;
-
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Route;
-import org.apache.camel.processor.MarshalProcessor;
-import org.apache.camel.processor.UnmarshalProcessor;
-import org.apache.camel.spi.DataFormat;
-import org.apache.camel.spi.RestConfiguration;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.MessageHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
-
-/**
- * A {@link org.apache.camel.Processor} that binds the REST DSL incoming and 
outgoing messages
- * from sources of json or xml to Java Objects.
- * <p/>
- * The binding uses {@link org.apache.camel.spi.DataFormat} for the actual 
work to transform
- * from xml/json to Java Objects and reverse again.
- */
-public class RestBindingProcessor extends ServiceSupport implements 
AsyncProcessor {
-
-    private final CamelContext camelContext;
-    private final AsyncProcessor jsonUnmarshal;
-    private final AsyncProcessor xmlUnmarshal;
-    private final AsyncProcessor jsonMarshal;
-    private final AsyncProcessor xmlMarshal;
-    private final String consumes;
-    private final String produces;
-    private final String bindingMode;
-    private final boolean skipBindingOnErrorCode;
-    private final boolean enableCORS;
-    private final Map<String, String> corsHeaders;
-    private final Map<String, String> queryDefaultValues;
-
-    public RestBindingProcessor(CamelContext camelContext, DataFormat 
jsonDataFormat, DataFormat xmlDataFormat,
-                                DataFormat outJsonDataFormat, DataFormat 
outXmlDataFormat,
-                                String consumes, String produces, String 
bindingMode,
-                                boolean skipBindingOnErrorCode, boolean 
enableCORS,
-                                Map<String, String> corsHeaders,
-                                Map<String, String> queryDefaultValues) {
-
-        this.camelContext = camelContext;
-
-        if (jsonDataFormat != null) {
-            this.jsonUnmarshal = new UnmarshalProcessor(jsonDataFormat);
-        } else {
-            this.jsonUnmarshal = null;
-        }
-        if (outJsonDataFormat != null) {
-            this.jsonMarshal = new MarshalProcessor(outJsonDataFormat);
-        } else if (jsonDataFormat != null) {
-            this.jsonMarshal = new MarshalProcessor(jsonDataFormat);
-        } else {
-            this.jsonMarshal = null;
-        }
-
-        if (xmlDataFormat != null) {
-            this.xmlUnmarshal = new UnmarshalProcessor(xmlDataFormat);
-        } else {
-            this.xmlUnmarshal = null;
-        }
-        if (outXmlDataFormat != null) {
-            this.xmlMarshal = new MarshalProcessor(outXmlDataFormat);
-        } else if (xmlDataFormat != null) {
-            this.xmlMarshal = new MarshalProcessor(xmlDataFormat);
-        } else {
-            this.xmlMarshal = null;
-        }
-
-        this.consumes = consumes;
-        this.produces = produces;
-        this.bindingMode = bindingMode;
-        this.skipBindingOnErrorCode = skipBindingOnErrorCode;
-        this.enableCORS = enableCORS;
-        this.corsHeaders = corsHeaders;
-        this.queryDefaultValues = queryDefaultValues;
-    }
-
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    @Override
-    public boolean process(Exchange exchange, final AsyncCallback callback) {
-        if (enableCORS) {
-            exchange.addOnCompletion(new 
RestBindingCORSOnCompletion(corsHeaders));
-        }
-
-        String method = exchange.getIn().getHeader(Exchange.HTTP_METHOD, 
String.class);
-        if ("OPTIONS".equalsIgnoreCase(method)) {
-            // for OPTIONS methods then we should not route at all as its part 
of CORS
-            exchange.setProperty(Exchange.ROUTE_STOP, true);
-            callback.done(true);
-            return true;
-        }
-
-        boolean isXml = false;
-        boolean isJson = false;
-
-        String contentType = ExchangeHelper.getContentType(exchange);
-        if (contentType != null) {
-            isXml = contentType.toLowerCase(Locale.ENGLISH).contains("xml");
-            isJson = contentType.toLowerCase(Locale.ENGLISH).contains("json");
-        }
-        // if content type could not tell us if it was json or xml, then 
fallback to if the binding was configured with
-        // that information in the consumes
-        if (!isXml && !isJson) {
-            isXml = consumes != null && 
consumes.toLowerCase(Locale.ENGLISH).contains("xml");
-            isJson = consumes != null && 
consumes.toLowerCase(Locale.ENGLISH).contains("json");
-        }
-
-        // only allow xml/json if the binding mode allows that
-        isXml &= bindingMode.equals("auto") || bindingMode.contains("xml");
-        isJson &= bindingMode.equals("auto") || bindingMode.contains("json");
-
-        // if we do not yet know if its xml or json, then use the binding mode 
to know the mode
-        if (!isJson && !isXml) {
-            isXml = bindingMode.equals("auto") || bindingMode.contains("xml");
-            isJson = bindingMode.equals("auto") || 
bindingMode.contains("json");
-        }
-
-        String accept = exchange.getIn().getHeader("Accept", String.class);
-
-        String body = null;
-        if (exchange.getIn().getBody() != null) {
-
-           // okay we have a binding mode, so need to check for empty body as 
that can cause the marshaller to fail
-            // as they assume a non-empty body
-            if (isXml || isJson) {
-                // we have binding enabled, so we need to know if there body 
is empty or not
-                // so force reading the body as a String which we can work with
-                body = MessageHelper.extractBodyAsString(exchange.getIn());
-                if (body != null) {
-                    exchange.getIn().setBody(body);
-
-                    if (isXml && isJson) {
-                        // we have still not determined between xml or json, 
so check the body if its xml based or not
-                        isXml = body.startsWith("<");
-                        isJson = !isXml;
-                    }
-                }
-            }
-        }
-
-        // add missing default values which are mapped as headers
-        if (queryDefaultValues != null) {
-            for (Map.Entry<String, String> entry : 
queryDefaultValues.entrySet()) {
-                if (exchange.getIn().getHeader(entry.getKey()) == null) {
-                    exchange.getIn().setHeader(entry.getKey(), 
entry.getValue());
-                }
-            }
-        }
-
-        // favor json over xml
-        if (isJson && jsonUnmarshal != null) {
-            // add reverse operation
-            exchange.addOnCompletion(new 
RestBindingMarshalOnCompletion(exchange.getFromRouteId(), jsonMarshal, 
xmlMarshal, false, accept));
-            if (ObjectHelper.isNotEmpty(body)) {
-                return jsonUnmarshal.process(exchange, callback);
-            } else {
-                callback.done(true);
-                return true;
-            }
-        } else if (isXml && xmlUnmarshal != null) {
-            // add reverse operation
-            exchange.addOnCompletion(new 
RestBindingMarshalOnCompletion(exchange.getFromRouteId(), jsonMarshal, 
xmlMarshal, true, accept));
-            if (ObjectHelper.isNotEmpty(body)) {
-                return xmlUnmarshal.process(exchange, callback);
-            } else {
-                callback.done(true);
-                return true;
-            }
-        }
-
-        // we could not bind
-        if ("off".equals(bindingMode) || bindingMode.equals("auto")) {
-            // okay for auto we do not mind if we could not bind
-            exchange.addOnCompletion(new 
RestBindingMarshalOnCompletion(exchange.getFromRouteId(), jsonMarshal, 
xmlMarshal, false, accept));
-            callback.done(true);
-            return true;
-        } else {
-            if (bindingMode.contains("xml")) {
-                exchange.setException(new BindingException("Cannot bind to xml 
as message body is not xml compatible", exchange));
-            } else {
-                exchange.setException(new BindingException("Cannot bind to 
json as message body is not json compatible", exchange));
-            }
-            callback.done(true);
-            return true;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "RestBindingProcessor";
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        // inject CamelContext before starting
-        if (jsonMarshal instanceof CamelContextAware) {
-            ((CamelContextAware) jsonMarshal).setCamelContext(camelContext);
-        }
-        if (jsonUnmarshal instanceof CamelContextAware) {
-            ((CamelContextAware) jsonUnmarshal).setCamelContext(camelContext);
-        }
-        if (xmlMarshal instanceof CamelContextAware) {
-            ((CamelContextAware) xmlMarshal).setCamelContext(camelContext);
-        }
-        if (xmlUnmarshal instanceof CamelContextAware) {
-            ((CamelContextAware) xmlUnmarshal).setCamelContext(camelContext);
-        }
-        ServiceHelper.startServices(jsonMarshal, jsonUnmarshal, xmlMarshal, 
xmlUnmarshal);
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        ServiceHelper.stopServices(jsonMarshal, jsonUnmarshal, xmlMarshal, 
xmlUnmarshal);
-    }
-
-    /**
-     * An {@link org.apache.camel.spi.Synchronization} that does the reverse 
operation
-     * of marshalling from POJO to json/xml
-     */
-    private final class RestBindingMarshalOnCompletion extends 
SynchronizationAdapter {
-
-        private final AsyncProcessor jsonMarshal;
-        private final AsyncProcessor xmlMarshal;
-        private final String routeId;
-        private boolean wasXml;
-        private String accept;
-
-        private RestBindingMarshalOnCompletion(String routeId, AsyncProcessor 
jsonMarshal, AsyncProcessor xmlMarshal, boolean wasXml, String accept) {
-            this.routeId = routeId;
-            this.jsonMarshal = jsonMarshal;
-            this.xmlMarshal = xmlMarshal;
-            this.wasXml = wasXml;
-            this.accept = accept;
-        }
-
-        @Override
-        public void onAfterRoute(Route route, Exchange exchange) {
-            // we use the onAfterRoute callback, to ensure the data has been 
marshalled before
-            // the consumer writes the response back
-
-            // only trigger when it was the 1st route that was done
-            if (!routeId.equals(route.getId())) {
-                return;
-            }
-
-            // only marshal if there was no exception
-            if (exchange.getException() != null) {
-                return;
-            }
-
-            if (skipBindingOnErrorCode) {
-                Integer code = exchange.hasOut() ? 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class) : 
exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
-                // if there is a custom http error code then skip binding
-                if (code != null && code >= 300) {
-                    return;
-                }
-            }
-
-            boolean isXml = false;
-            boolean isJson = false;
-
-            // accept takes precedence
-            if (accept != null) {
-                isXml = accept.toLowerCase(Locale.ENGLISH).contains("xml");
-                isJson = accept.toLowerCase(Locale.ENGLISH).contains("json");
-            }
-            // fallback to content type if still undecided
-            if (!isXml && !isJson) {
-                String contentType = ExchangeHelper.getContentType(exchange);
-                if (contentType != null) {
-                    isXml = 
contentType.toLowerCase(Locale.ENGLISH).contains("xml");
-                    isJson = 
contentType.toLowerCase(Locale.ENGLISH).contains("json");
-                }
-            }
-            // if content type could not tell us if it was json or xml, then 
fallback to if the binding was configured with
-            // that information in the consumes
-            if (!isXml && !isJson) {
-                isXml = produces != null && 
produces.toLowerCase(Locale.ENGLISH).contains("xml");
-                isJson = produces != null && 
produces.toLowerCase(Locale.ENGLISH).contains("json");
-            }
-
-            // only allow xml/json if the binding mode allows that (when off 
we still want to know if its xml or json)
-            if (bindingMode != null) {
-                isXml &= bindingMode.equals("off") || 
bindingMode.equals("auto") || bindingMode.contains("xml");
-                isJson &= bindingMode.equals("off") || 
bindingMode.equals("auto") || bindingMode.contains("json");
-
-                // if we do not yet know if its xml or json, then use the 
binding mode to know the mode
-                if (!isJson && !isXml) {
-                    isXml = bindingMode.equals("auto") || 
bindingMode.contains("xml");
-                    isJson = bindingMode.equals("auto") || 
bindingMode.contains("json");
-                }
-            }
-
-            // in case we have not yet been able to determine if xml or json, 
then use the same as in the unmarshaller
-            if (isXml && isJson) {
-                isXml = wasXml;
-                isJson = !wasXml;
-            }
-
-            // need to prepare exchange first
-            ExchangeHelper.prepareOutToIn(exchange);
-
-            // ensure there is a content type header (even if binding is off)
-            ensureHeaderContentType(produces, isXml, isJson, exchange);
-
-            if (bindingMode == null || "off".equals(bindingMode)) {
-                // binding is off, so no message body binding
-                return;
-            }
-
-            // is there any marshaller at all
-            if (jsonMarshal == null && xmlMarshal == null) {
-                return;
-            }
-
-            // is the body empty
-            if ((exchange.hasOut() && exchange.getOut().getBody() == null) || 
(!exchange.hasOut() && exchange.getIn().getBody() == null)) {
-                return;
-            }
-
-            String contentType = 
exchange.getIn().getHeader(Exchange.CONTENT_TYPE, String.class);
-            // need to lower-case so the contains check below can match if 
using upper case
-            contentType = contentType.toLowerCase(Locale.US);
-            try {
-                // favor json over xml
-                if (isJson && jsonMarshal != null) {
-                    // only marshal if its json content type
-                    if (contentType.contains("json")) {
-                        jsonMarshal.process(exchange);
-                    }
-                } else if (isXml && xmlMarshal != null) {
-                    // only marshal if its xml content type
-                    if (contentType.contains("xml")) {
-                        xmlMarshal.process(exchange);
-                    }
-                } else {
-                    // we could not bind
-                    if (bindingMode.equals("auto")) {
-                        // okay for auto we do not mind if we could not bind
-                    } else {
-                        if (bindingMode.contains("xml")) {
-                            exchange.setException(new BindingException("Cannot 
bind to xml as message body is not xml compatible", exchange));
-                        } else {
-                            exchange.setException(new BindingException("Cannot 
bind to json as message body is not json compatible", exchange));
-                        }
-                    }
-                }
-            } catch (Throwable e) {
-                exchange.setException(e);
-            }
-        }
-
-        private void ensureHeaderContentType(String contentType, boolean 
isXml, boolean isJson, Exchange exchange) {
-            // favor given content type
-            if (contentType != null) {
-                String type = ExchangeHelper.getContentType(exchange);
-                if (type == null) {
-                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
contentType);
-                }
-            }
-
-            // favor json over xml
-            if (isJson) {
-                // make sure there is a content-type with json
-                String type = ExchangeHelper.getContentType(exchange);
-                if (type == null) {
-                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
"application/json");
-                }
-            } else if (isXml) {
-                // make sure there is a content-type with xml
-                String type = ExchangeHelper.getContentType(exchange);
-                if (type == null) {
-                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
"application/xml");
-                }
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "RestBindingMarshalOnCompletion";
-        }
-    }
-
-    private final class RestBindingCORSOnCompletion extends 
SynchronizationAdapter {
-
-        private final Map<String, String> corsHeaders;
-
-        private RestBindingCORSOnCompletion(Map<String, String> corsHeaders) {
-            this.corsHeaders = corsHeaders;
-        }
-
-        @Override
-        public void onAfterRoute(Route route, Exchange exchange) {
-            // add the CORS headers after routing, but before the consumer 
writes the response
-            Message msg = exchange.hasOut() ? exchange.getOut() : 
exchange.getIn();
-
-            // use default value if none has been configured
-            String allowOrigin = corsHeaders != null ? 
corsHeaders.get("Access-Control-Allow-Origin") : null;
-            if (allowOrigin == null) {
-                allowOrigin = 
RestConfiguration.CORS_ACCESS_CONTROL_ALLOW_ORIGIN;
-            }
-            String allowMethods = corsHeaders != null ? 
corsHeaders.get("Access-Control-Allow-Methods") : null;
-            if (allowMethods == null) {
-                allowMethods = 
RestConfiguration.CORS_ACCESS_CONTROL_ALLOW_METHODS;
-            }
-            String allowHeaders = corsHeaders != null ? 
corsHeaders.get("Access-Control-Allow-Headers") : null;
-            if (allowHeaders == null) {
-                allowHeaders = 
RestConfiguration.CORS_ACCESS_CONTROL_ALLOW_HEADERS;
-            }
-            String maxAge = corsHeaders != null ? 
corsHeaders.get("Access-Control-Max-Age") : null;
-            if (maxAge == null) {
-                maxAge = RestConfiguration.CORS_ACCESS_CONTROL_MAX_AGE;
-            }
-
-            msg.setHeader("Access-Control-Allow-Origin", allowOrigin);
-            msg.setHeader("Access-Control-Allow-Methods", allowMethods);
-            msg.setHeader("Access-Control-Allow-Headers", allowHeaders);
-            msg.setHeader("Access-Control-Max-Age", maxAge);
-        }
-
-        @Override
-        public String toString() {
-            return "RestBindingCORSOnCompletion";
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/camel-core/src/main/java/org/apache/camel/processor/binding/RestConsumerBindingProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/binding/RestConsumerBindingProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/binding/RestConsumerBindingProcessor.java
new file mode 100644
index 0000000..d9f0656
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/binding/RestConsumerBindingProcessor.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.binding;
+
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Route;
+import org.apache.camel.processor.MarshalProcessor;
+import org.apache.camel.processor.UnmarshalProcessor;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.spi.RestConfiguration;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.Processor} that binds the REST DSL incoming and 
outgoing messages
+ * from sources of json or xml to Java Objects.
+ * <p/>
+ * The binding uses {@link org.apache.camel.spi.DataFormat} for the actual 
work to transform
+ * from xml/json to Java Objects and reverse again.
+ */
+public class RestConsumerBindingProcessor extends ServiceSupport implements 
AsyncProcessor {
+
+    private final CamelContext camelContext;
+    private final AsyncProcessor jsonUnmarshal;
+    private final AsyncProcessor xmlUnmarshal;
+    private final AsyncProcessor jsonMarshal;
+    private final AsyncProcessor xmlMarshal;
+    private final String consumes;
+    private final String produces;
+    private final String bindingMode;
+    private final boolean skipBindingOnErrorCode;
+    private final boolean enableCORS;
+    private final Map<String, String> corsHeaders;
+    private final Map<String, String> queryDefaultValues;
+
+    public RestConsumerBindingProcessor(CamelContext camelContext, DataFormat 
jsonDataFormat, DataFormat xmlDataFormat,
+                                        DataFormat outJsonDataFormat, 
DataFormat outXmlDataFormat,
+                                        String consumes, String produces, 
String bindingMode,
+                                        boolean skipBindingOnErrorCode, 
boolean enableCORS,
+                                        Map<String, String> corsHeaders,
+                                        Map<String, String> 
queryDefaultValues) {
+
+        this.camelContext = camelContext;
+
+        if (jsonDataFormat != null) {
+            this.jsonUnmarshal = new UnmarshalProcessor(jsonDataFormat);
+        } else {
+            this.jsonUnmarshal = null;
+        }
+        if (outJsonDataFormat != null) {
+            this.jsonMarshal = new MarshalProcessor(outJsonDataFormat);
+        } else if (jsonDataFormat != null) {
+            this.jsonMarshal = new MarshalProcessor(jsonDataFormat);
+        } else {
+            this.jsonMarshal = null;
+        }
+
+        if (xmlDataFormat != null) {
+            this.xmlUnmarshal = new UnmarshalProcessor(xmlDataFormat);
+        } else {
+            this.xmlUnmarshal = null;
+        }
+        if (outXmlDataFormat != null) {
+            this.xmlMarshal = new MarshalProcessor(outXmlDataFormat);
+        } else if (xmlDataFormat != null) {
+            this.xmlMarshal = new MarshalProcessor(xmlDataFormat);
+        } else {
+            this.xmlMarshal = null;
+        }
+
+        this.consumes = consumes;
+        this.produces = produces;
+        this.bindingMode = bindingMode;
+        this.skipBindingOnErrorCode = skipBindingOnErrorCode;
+        this.enableCORS = enableCORS;
+        this.corsHeaders = corsHeaders;
+        this.queryDefaultValues = queryDefaultValues;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        if (enableCORS) {
+            exchange.addOnCompletion(new 
RestBindingCORSOnCompletion(corsHeaders));
+        }
+
+        String method = exchange.getIn().getHeader(Exchange.HTTP_METHOD, 
String.class);
+        if ("OPTIONS".equalsIgnoreCase(method)) {
+            // for OPTIONS methods then we should not route at all as its part 
of CORS
+            exchange.setProperty(Exchange.ROUTE_STOP, true);
+            callback.done(true);
+            return true;
+        }
+
+        boolean isXml = false;
+        boolean isJson = false;
+
+        String contentType = ExchangeHelper.getContentType(exchange);
+        if (contentType != null) {
+            isXml = contentType.toLowerCase(Locale.ENGLISH).contains("xml");
+            isJson = contentType.toLowerCase(Locale.ENGLISH).contains("json");
+        }
+        // if content type could not tell us if it was json or xml, then 
fallback to if the binding was configured with
+        // that information in the consumes
+        if (!isXml && !isJson) {
+            isXml = consumes != null && 
consumes.toLowerCase(Locale.ENGLISH).contains("xml");
+            isJson = consumes != null && 
consumes.toLowerCase(Locale.ENGLISH).contains("json");
+        }
+
+        // only allow xml/json if the binding mode allows that
+        isXml &= bindingMode.equals("auto") || bindingMode.contains("xml");
+        isJson &= bindingMode.equals("auto") || bindingMode.contains("json");
+
+        // if we do not yet know if its xml or json, then use the binding mode 
to know the mode
+        if (!isJson && !isXml) {
+            isXml = bindingMode.equals("auto") || bindingMode.contains("xml");
+            isJson = bindingMode.equals("auto") || 
bindingMode.contains("json");
+        }
+
+        String accept = exchange.getIn().getHeader("Accept", String.class);
+
+        String body = null;
+        if (exchange.getIn().getBody() != null) {
+
+            // okay we have a binding mode, so need to check for empty body as 
that can cause the marshaller to fail
+            // as they assume a non-empty body
+            if (isXml || isJson) {
+                // we have binding enabled, so we need to know if there body 
is empty or not
+                // so force reading the body as a String which we can work with
+                body = MessageHelper.extractBodyAsString(exchange.getIn());
+                if (body != null) {
+                    exchange.getIn().setBody(body);
+
+                    if (isXml && isJson) {
+                        // we have still not determined between xml or json, 
so check the body if its xml based or not
+                        isXml = body.startsWith("<");
+                        isJson = !isXml;
+                    }
+                }
+            }
+        }
+
+        // add missing default values which are mapped as headers
+        if (queryDefaultValues != null) {
+            for (Map.Entry<String, String> entry : 
queryDefaultValues.entrySet()) {
+                if (exchange.getIn().getHeader(entry.getKey()) == null) {
+                    exchange.getIn().setHeader(entry.getKey(), 
entry.getValue());
+                }
+            }
+        }
+
+        // favor json over xml
+        if (isJson && jsonUnmarshal != null) {
+            // add reverse operation
+            exchange.addOnCompletion(new 
RestBindingMarshalOnCompletion(exchange.getFromRouteId(), jsonMarshal, 
xmlMarshal, false, accept));
+            if (ObjectHelper.isNotEmpty(body)) {
+                return jsonUnmarshal.process(exchange, callback);
+            } else {
+                callback.done(true);
+                return true;
+            }
+        } else if (isXml && xmlUnmarshal != null) {
+            // add reverse operation
+            exchange.addOnCompletion(new 
RestBindingMarshalOnCompletion(exchange.getFromRouteId(), jsonMarshal, 
xmlMarshal, true, accept));
+            if (ObjectHelper.isNotEmpty(body)) {
+                return xmlUnmarshal.process(exchange, callback);
+            } else {
+                callback.done(true);
+                return true;
+            }
+        }
+
+        // we could not bind
+        if ("off".equals(bindingMode) || bindingMode.equals("auto")) {
+            // okay for auto we do not mind if we could not bind
+            exchange.addOnCompletion(new 
RestBindingMarshalOnCompletion(exchange.getFromRouteId(), jsonMarshal, 
xmlMarshal, false, accept));
+            callback.done(true);
+            return true;
+        } else {
+            if (bindingMode.contains("xml")) {
+                exchange.setException(new BindingException("Cannot bind to xml 
as message body is not xml compatible", exchange));
+            } else {
+                exchange.setException(new BindingException("Cannot bind to 
json as message body is not json compatible", exchange));
+            }
+            callback.done(true);
+            return true;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "RestBindingProcessor";
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // inject CamelContext before starting
+        if (jsonMarshal instanceof CamelContextAware) {
+            ((CamelContextAware) jsonMarshal).setCamelContext(camelContext);
+        }
+        if (jsonUnmarshal instanceof CamelContextAware) {
+            ((CamelContextAware) jsonUnmarshal).setCamelContext(camelContext);
+        }
+        if (xmlMarshal instanceof CamelContextAware) {
+            ((CamelContextAware) xmlMarshal).setCamelContext(camelContext);
+        }
+        if (xmlUnmarshal instanceof CamelContextAware) {
+            ((CamelContextAware) xmlUnmarshal).setCamelContext(camelContext);
+        }
+        ServiceHelper.startServices(jsonMarshal, jsonUnmarshal, xmlMarshal, 
xmlUnmarshal);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(jsonMarshal, jsonUnmarshal, xmlMarshal, 
xmlUnmarshal);
+    }
+
+    /**
+     * An {@link org.apache.camel.spi.Synchronization} that does the reverse 
operation
+     * of marshalling from POJO to json/xml
+     */
+    private final class RestBindingMarshalOnCompletion extends 
SynchronizationAdapter {
+
+        private final AsyncProcessor jsonMarshal;
+        private final AsyncProcessor xmlMarshal;
+        private final String routeId;
+        private boolean wasXml;
+        private String accept;
+
+        private RestBindingMarshalOnCompletion(String routeId, AsyncProcessor 
jsonMarshal, AsyncProcessor xmlMarshal, boolean wasXml, String accept) {
+            this.routeId = routeId;
+            this.jsonMarshal = jsonMarshal;
+            this.xmlMarshal = xmlMarshal;
+            this.wasXml = wasXml;
+            this.accept = accept;
+        }
+
+        @Override
+        public void onAfterRoute(Route route, Exchange exchange) {
+            // we use the onAfterRoute callback, to ensure the data has been 
marshalled before
+            // the consumer writes the response back
+
+            // only trigger when it was the 1st route that was done
+            if (!routeId.equals(route.getId())) {
+                return;
+            }
+
+            // only marshal if there was no exception
+            if (exchange.getException() != null) {
+                return;
+            }
+
+            if (skipBindingOnErrorCode) {
+                Integer code = exchange.hasOut() ? 
exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class) : 
exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
+                // if there is a custom http error code then skip binding
+                if (code != null && code >= 300) {
+                    return;
+                }
+            }
+
+            boolean isXml = false;
+            boolean isJson = false;
+
+            // accept takes precedence
+            if (accept != null) {
+                isXml = accept.toLowerCase(Locale.ENGLISH).contains("xml");
+                isJson = accept.toLowerCase(Locale.ENGLISH).contains("json");
+            }
+            // fallback to content type if still undecided
+            if (!isXml && !isJson) {
+                String contentType = ExchangeHelper.getContentType(exchange);
+                if (contentType != null) {
+                    isXml = 
contentType.toLowerCase(Locale.ENGLISH).contains("xml");
+                    isJson = 
contentType.toLowerCase(Locale.ENGLISH).contains("json");
+                }
+            }
+            // if content type could not tell us if it was json or xml, then 
fallback to if the binding was configured with
+            // that information in the consumes
+            if (!isXml && !isJson) {
+                isXml = produces != null && 
produces.toLowerCase(Locale.ENGLISH).contains("xml");
+                isJson = produces != null && 
produces.toLowerCase(Locale.ENGLISH).contains("json");
+            }
+
+            // only allow xml/json if the binding mode allows that (when off 
we still want to know if its xml or json)
+            if (bindingMode != null) {
+                isXml &= bindingMode.equals("off") || 
bindingMode.equals("auto") || bindingMode.contains("xml");
+                isJson &= bindingMode.equals("off") || 
bindingMode.equals("auto") || bindingMode.contains("json");
+
+                // if we do not yet know if its xml or json, then use the 
binding mode to know the mode
+                if (!isJson && !isXml) {
+                    isXml = bindingMode.equals("auto") || 
bindingMode.contains("xml");
+                    isJson = bindingMode.equals("auto") || 
bindingMode.contains("json");
+                }
+            }
+
+            // in case we have not yet been able to determine if xml or json, 
then use the same as in the unmarshaller
+            if (isXml && isJson) {
+                isXml = wasXml;
+                isJson = !wasXml;
+            }
+
+            // need to prepare exchange first
+            ExchangeHelper.prepareOutToIn(exchange);
+
+            // ensure there is a content type header (even if binding is off)
+            ensureHeaderContentType(produces, isXml, isJson, exchange);
+
+            if (bindingMode == null || "off".equals(bindingMode)) {
+                // binding is off, so no message body binding
+                return;
+            }
+
+            // is there any marshaller at all
+            if (jsonMarshal == null && xmlMarshal == null) {
+                return;
+            }
+
+            // is the body empty
+            if ((exchange.hasOut() && exchange.getOut().getBody() == null) || 
(!exchange.hasOut() && exchange.getIn().getBody() == null)) {
+                return;
+            }
+
+            String contentType = 
exchange.getIn().getHeader(Exchange.CONTENT_TYPE, String.class);
+            // need to lower-case so the contains check below can match if 
using upper case
+            contentType = contentType.toLowerCase(Locale.US);
+            try {
+                // favor json over xml
+                if (isJson && jsonMarshal != null) {
+                    // only marshal if its json content type
+                    if (contentType.contains("json")) {
+                        jsonMarshal.process(exchange);
+                    }
+                } else if (isXml && xmlMarshal != null) {
+                    // only marshal if its xml content type
+                    if (contentType.contains("xml")) {
+                        xmlMarshal.process(exchange);
+                    }
+                } else {
+                    // we could not bind
+                    if (bindingMode.equals("auto")) {
+                        // okay for auto we do not mind if we could not bind
+                    } else {
+                        if (bindingMode.contains("xml")) {
+                            exchange.setException(new BindingException("Cannot 
bind to xml as message body is not xml compatible", exchange));
+                        } else {
+                            exchange.setException(new BindingException("Cannot 
bind to json as message body is not json compatible", exchange));
+                        }
+                    }
+                }
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+        }
+
+        private void ensureHeaderContentType(String contentType, boolean 
isXml, boolean isJson, Exchange exchange) {
+            // favor given content type
+            if (contentType != null) {
+                String type = ExchangeHelper.getContentType(exchange);
+                if (type == null) {
+                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
contentType);
+                }
+            }
+
+            // favor json over xml
+            if (isJson) {
+                // make sure there is a content-type with json
+                String type = ExchangeHelper.getContentType(exchange);
+                if (type == null) {
+                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
"application/json");
+                }
+            } else if (isXml) {
+                // make sure there is a content-type with xml
+                String type = ExchangeHelper.getContentType(exchange);
+                if (type == null) {
+                    exchange.getIn().setHeader(Exchange.CONTENT_TYPE, 
"application/xml");
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "RestBindingMarshalOnCompletion";
+        }
+    }
+
+    private final class RestBindingCORSOnCompletion extends 
SynchronizationAdapter {
+
+        private final Map<String, String> corsHeaders;
+
+        private RestBindingCORSOnCompletion(Map<String, String> corsHeaders) {
+            this.corsHeaders = corsHeaders;
+        }
+
+        @Override
+        public void onAfterRoute(Route route, Exchange exchange) {
+            // add the CORS headers after routing, but before the consumer 
writes the response
+            Message msg = exchange.hasOut() ? exchange.getOut() : 
exchange.getIn();
+
+            // use default value if none has been configured
+            String allowOrigin = corsHeaders != null ? 
corsHeaders.get("Access-Control-Allow-Origin") : null;
+            if (allowOrigin == null) {
+                allowOrigin = 
RestConfiguration.CORS_ACCESS_CONTROL_ALLOW_ORIGIN;
+            }
+            String allowMethods = corsHeaders != null ? 
corsHeaders.get("Access-Control-Allow-Methods") : null;
+            if (allowMethods == null) {
+                allowMethods = 
RestConfiguration.CORS_ACCESS_CONTROL_ALLOW_METHODS;
+            }
+            String allowHeaders = corsHeaders != null ? 
corsHeaders.get("Access-Control-Allow-Headers") : null;
+            if (allowHeaders == null) {
+                allowHeaders = 
RestConfiguration.CORS_ACCESS_CONTROL_ALLOW_HEADERS;
+            }
+            String maxAge = corsHeaders != null ? 
corsHeaders.get("Access-Control-Max-Age") : null;
+            if (maxAge == null) {
+                maxAge = RestConfiguration.CORS_ACCESS_CONTROL_MAX_AGE;
+            }
+
+            msg.setHeader("Access-Control-Allow-Origin", allowOrigin);
+            msg.setHeader("Access-Control-Allow-Methods", allowMethods);
+            msg.setHeader("Access-Control-Allow-Headers", allowHeaders);
+            msg.setHeader("Access-Control-Max-Age", maxAge);
+        }
+
+        @Override
+        public String toString() {
+            return "RestBindingCORSOnCompletion";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/rest/producer/JettyRestProducerPojoInOutTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/rest/producer/JettyRestProducerPojoInOutTest.java
 
b/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/rest/producer/JettyRestProducerPojoInOutTest.java
new file mode 100644
index 0000000..bd4f2bf
--- /dev/null
+++ 
b/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/rest/producer/JettyRestProducerPojoInOutTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty.rest.producer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jetty.BaseJettyTest;
+import org.apache.camel.component.jetty.rest.CountryPojo;
+import org.apache.camel.component.jetty.rest.UserPojo;
+import org.apache.camel.component.jetty.rest.UserService;
+import org.apache.camel.model.rest.RestBindingMode;
+import org.junit.Test;
+
+public class JettyRestProducerPojoInOutTest extends BaseJettyTest {
+
+    @Test
+    public void testJettyEmptyBody() throws Exception {
+        String out = fluentTemplate.to("rest:get:users/lives")
+                .withHeader(Exchange.CONTENT_TYPE, "application/json")
+                .request(String.class);
+
+        assertNotNull(out);
+        assertEquals("{\"iso\":\"EN\",\"country\":\"England\"}", out);
+    }
+
+    @Test
+    public void testJettyJSonBody() throws Exception {
+        String body = "{\"id\": 123, \"name\": \"Donald Duck\"}";
+
+        String out = fluentTemplate.to("rest:post:users/lives")
+                .withHeader(Exchange.CONTENT_TYPE, "application/json")
+                .withBody(body).request(String.class);
+
+        assertNotNull(out);
+        assertEquals("{\"iso\":\"EN\",\"country\":\"England\"}", out);
+    }
+
+    @Test
+    public void testJettyPojoIn() throws Exception {
+        UserPojo user = new UserPojo();
+        user.setId(123);
+        user.setName("Donald Duck");
+
+        String out = fluentTemplate.to("rest:post:users/lives")
+                .withHeader(Exchange.CONTENT_TYPE, "application/json")
+                .withBody(user).request(String.class);
+
+        assertNotNull(out);
+        assertEquals("{\"iso\":\"EN\",\"country\":\"England\"}", out);
+    }
+
+    @Test
+    public void testJettyPojoInOut() throws Exception {
+        UserPojo user = new UserPojo();
+        user.setId(123);
+        user.setName("Donald Duck");
+
+        CountryPojo pojo = fluentTemplate.to("rest:post:users/lives")
+                .withHeader(Exchange.CONTENT_TYPE, "application/json")
+                .withBody(user).request(CountryPojo.class);
+
+        assertNotNull(pojo);
+        assertEquals("EN", pojo.getIso());
+        assertEquals("England", pojo.getCountry());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // configure to use jetty on localhost with the given port
+                // and enable auto binding mode
+                
restConfiguration().component("jetty").host("localhost").port(getPort()).bindingMode(RestBindingMode.auto);
+
+                // use the rest DSL to define the rest services
+                rest("/users/")
+                        // just return the default country here
+                        .get("lives").to("direct:start")
+                        
.post("lives").type(UserPojo.class).outType(CountryPojo.class)
+                        .route()
+                        .log("Lives where")
+                        .bean(new UserService(), "livesWhere");
+
+                CountryPojo country = new CountryPojo();
+                country.setIso("EN");
+                country.setCountry("England");
+                from("direct:start").transform().constant(country);
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/737b49ca/components/camel-jetty9/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-jetty9/src/test/resources/log4j2.properties 
b/components/camel-jetty9/src/test/resources/log4j2.properties
index c863dc3..d54c30d 100644
--- a/components/camel-jetty9/src/test/resources/log4j2.properties
+++ b/components/camel-jetty9/src/test/resources/log4j2.properties
@@ -25,4 +25,4 @@ appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
 rootLogger.level = INFO
-rootLogger.appenderRef.file.ref = file
+rootLogger.appenderRef.out.ref = out

Reply via email to