CAMEL-7354: camel-spark component.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/183bbef2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/183bbef2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/183bbef2

Branch: refs/heads/master
Commit: 183bbef21c8b4db6b1b0f83052df395c7adc8537
Parents: 3ff1072
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jun 27 17:34:29 2014 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jun 27 17:34:29 2014 +0200

----------------------------------------------------------------------
 components/camel-spark/pom.xml                  | 32 +++++++
 .../camel/component/spark/CamelSparkRoute.java  |  9 +-
 .../component/spark/DefaultSparkBinding.java    | 89 +++++++++++++++++++-
 .../camel/component/spark/SparkBinding.java     | 22 ++++-
 .../component/spark/SparkConfiguration.java     | 10 +++
 .../camel/component/spark/SparkConstants.java   | 26 ++++++
 .../spark/SparkHeaderFilterStrategy.java        |  4 +
 7 files changed, 188 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-spark/pom.xml b/components/camel-spark/pom.xml
index d3f48f2..16f7c81 100644
--- a/components/camel-spark/pom.xml
+++ b/components/camel-spark/pom.xml
@@ -101,4 +101,36 @@
     </dependency>
   </dependencies>
 
+  <!-- unit testing requires java 8 -->
+  <profiles>
+    <profile>
+      <id>jdk8-test</id>
+      <activation>
+        <jdk>1.8</jdk>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>false</skipTests>
+              
<forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <skipTests>true</skipTests>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java
index c418f88..76f1fa0 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java
@@ -20,6 +20,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.util.ObjectHelper;
 import spark.Request;
 import spark.Response;
 import spark.Route;
@@ -47,8 +48,12 @@ public class CamelSparkRoute implements Route {
             exchange.setException(e);
         }
 
-        if (exchange.getException() != null) {
-            // TODO: how to handle exchange failures
+        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+
+        try {
+            endpoint.getSparkBinding().toSparkResponse(msg, response, 
endpoint.getSparkConfiguration());
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
         }
 
         if (exchange.hasOut()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java
index f40b449..854f94b 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java
@@ -16,16 +16,27 @@
  */
 package org.apache.camel.component.spark;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.TypeConverter;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import spark.Request;
+import spark.Response;
 
 public class DefaultSparkBinding implements SparkBinding {
 
@@ -90,12 +101,88 @@ public class DefaultSparkBinding implements SparkBinding {
         }
 
         String[] splat = request.splat();
-        String key = "splat";
+        String key = SparkConstants.SPLAT;
         if (headerFilterStrategy != null
                 && !headerFilterStrategy.applyFilterToExternalHeaders(key, 
splat, exchange)) {
             SparkHelper.appendHeader(headers, key, splat);
         }
+    }
+
+    @Override
+    public void toSparkResponse(Message message, Response response, 
SparkConfiguration configuration) throws Exception {
+        LOG.trace("toSparkResponse: {}", message);
+
+        // the response code is 200 for OK and 500 for failed
+        boolean failed = message.getExchange().isFailed();
+        int defaultCode = failed ? 500 : 200;
+
+        int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, 
int.class);
+        response.status(code);
+        LOG.trace("HTTP Status Code: {}", code);
+
+        TypeConverter tc = 
message.getExchange().getContext().getTypeConverter();
+
+        // append headers
+        // must use entrySet to ensure case of keys is preserved
+        for (Map.Entry<String, Object> entry : 
message.getHeaders().entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            // use an iterator as there can be multiple values. (must not use 
a delimiter)
+            final Iterator<?> it = ObjectHelper.createIterator(value, null);
+            while (it.hasNext()) {
+                String headerValue = tc.convertTo(String.class, it.next());
+                if (headerValue != null && headerFilterStrategy != null
+                        && 
!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, 
message.getExchange())) {
+                    LOG.trace("HTTP-Header: {}={}", key, headerValue);
+                    response.header(key, headerValue);
+                }
+            }
+        }
+
+        // set the content type in the response.
+        String contentType = MessageHelper.getContentType(message);
+        if (contentType != null) {
+            // set content-type
+            response.header(Exchange.CONTENT_TYPE, contentType);
+            LOG.trace("Content-Type: {}", contentType);
+        }
+
+        Object body = message.getBody();
+        Exception cause = message.getExchange().getException();
+
+        // if there was an exception then use that as body
+        if (cause != null) {
+            if (configuration.isTransferException()) {
+                // we failed due an exception, and transfer it as java 
serialized object
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                ObjectOutputStream oos = new ObjectOutputStream(bos);
+                oos.writeObject(cause);
+                oos.flush();
+                IOHelper.close(oos, bos);
+
+                body = bos.toByteArray();
+                // force content type to be serialized java object
+                message.setHeader(Exchange.CONTENT_TYPE, 
SparkConstants.CONTENT_TYPE_JAVA_SERIALIZED_OBJECT);
+            } else {
+                // we failed due an exception so print it as plain text
+                StringWriter sw = new StringWriter();
+                PrintWriter pw = new PrintWriter(sw);
+                cause.printStackTrace(pw);
+
+                // the body should then be the stacktrace
+                body = sw.toString().getBytes();
+                // force content type to be text/plain as that is what the 
stacktrace is
+                message.setHeader(Exchange.CONTENT_TYPE, "text/plain");
+            }
 
+            // and mark the exception as failure handled, as we handled it by 
returning it as the response
+            ExchangeHelper.setFailureHandled(message.getExchange());
+        }
+
+        if (body != null) {
+            String str = tc.mandatoryConvertTo(String.class, 
message.getExchange(), body);
+            response.body(str);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java
index 61d8798..21053ae 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java
@@ -21,13 +21,14 @@ import java.util.Map;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import spark.Request;
+import spark.Response;
 
 public interface SparkBinding {
 
     /**
      * Binds from Spark {@link Request} to Camel {@link 
org.apache.camel.Message}.
      *
-     * @param request       the netty http request
+     * @param request       the Spark request
      * @param exchange      the exchange that should contain the returned 
message.
      * @param configuration configuration
      * @return the message to store on the given exchange
@@ -35,6 +36,25 @@ public interface SparkBinding {
      */
     Message toCamelMessage(Request request, Exchange exchange, 
SparkConfiguration configuration) throws Exception;
 
+    /**
+     * Binds from Spark {@link Request} to Camel headers as a {@link Map}.
+     *
+     * @param request       the Spark request
+     * @param headers       the Camel headers that should be populated
+     * @param exchange      the exchange that should contain the returned 
message.
+     * @param configuration the endpoint configuration
+     * @throws Exception is thrown if error during binding
+     */
     void populateCamelHeaders(Request request, Map<String, Object> headers, 
Exchange exchange, SparkConfiguration configuration) throws Exception;
 
+    /**
+     * Binds from Camel {@link Message} to Spark {@link Response}.
+     *
+     * @param message       the Camel message
+     * @param response      the Spark response to bind to
+     * @param configuration the endpoint configuration
+     * @throws Exception is thrown if error during binding
+     */
+    void toSparkResponse(Message message, Response response, 
SparkConfiguration configuration) throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java
index 0c6d2b7..a24245a 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java
@@ -28,6 +28,8 @@ public class SparkConfiguration {
     private boolean disableStreamCache;
     @UriParam
     private boolean urlDecodeHeaders;
+    @UriParam
+    private boolean transferException;
 
     public boolean isMapHeaders() {
         return mapHeaders;
@@ -52,4 +54,12 @@ public class SparkConfiguration {
     public void setUrlDecodeHeaders(boolean urlDecodeHeaders) {
         this.urlDecodeHeaders = urlDecodeHeaders;
     }
+
+    public boolean isTransferException() {
+        return transferException;
+    }
+
+    public void setTransferException(boolean transferException) {
+        this.transferException = transferException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java
new file mode 100644
index 0000000..8a026fa
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+public final class SparkConstants {
+
+    public static final String CONTENT_TYPE_JAVA_SERIALIZED_OBJECT = 
"application/x-java-serialized-object";
+    public static final String SPLAT = "splat";
+
+    private SparkConstants() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java
index 33e0eb0..f729bde 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java
@@ -31,6 +31,7 @@ public class SparkHeaderFilterStrategy extends 
DefaultHeaderFilterStrategy {
         getOutFilter().add("content-length");
         getOutFilter().add("content-type");
         getOutFilter().add("host");
+        getOutFilter().add("user-agent");
         // Add the filter for the Generic Message header
         // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.5
         getOutFilter().add("cache-control");
@@ -48,6 +49,9 @@ public class SparkHeaderFilterStrategy extends 
DefaultHeaderFilterStrategy {
         // filter headers begin with "Camel" or "org.apache.camel"
         // must ignore case for Http based transports
         
setOutFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+
+        // filter out splat as its an internal header
+        getOutFilter().add(SparkConstants.SPLAT);
     }
 
 }

Reply via email to