Repository: camel
Updated Branches:
  refs/heads/master 1ee0008e7 -> b46392c25


[CAMEL-9385] Initial commit of Apache Spark component.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b46392c2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b46392c2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b46392c2

Branch: refs/heads/master
Commit: b46392c25bd517cbb7d04e6dd7611d1634378a13
Parents: 1ee0008
Author: Henryk Konsek <hekon...@gmail.com>
Authored: Fri Dec 4 12:00:33 2015 +0100
Committer: Henryk Konsek <hekon...@gmail.com>
Committed: Fri Dec 4 12:00:33 2015 +0100

----------------------------------------------------------------------
 components/camel-spark/.gitignore               |   1 +
 components/camel-spark/pom.xml                  | 116 ++++++++++
 .../component/spark/DataFrameCallback.java      |  30 +++
 .../component/spark/DataFrameSparkProducer.java |  86 ++++++++
 .../camel/component/spark/EndpointType.java     |  23 ++
 .../camel/component/spark/RddCallback.java      |  30 +++
 .../camel/component/spark/RddSparkProducer.java |  85 ++++++++
 .../camel/component/spark/SparkComponent.java   |  35 +++
 .../camel/component/spark/SparkConstants.java   |  32 +++
 .../camel/component/spark/SparkEndpoint.java    | 117 ++++++++++
 .../camel/component/spark/SparkMongos.java      |  37 ++++
 .../component/spark/SparkTransformation.java    |  23 ++
 .../apache/camel/component/spark/Sparks.java    |  30 +++
 .../camel/component/spark/TypedRddCallback.java |  43 ++++
 .../camel/component/spark/VoidRddCallback.java  |  31 +++
 .../spark/annotations/AnnotatedRddCallback.java |  61 ++++++
 .../spark/annotations/RddCallback.java          |  29 +++
 .../services/org/apache/camel/component/spark   |  18 ++
 .../component/spark/SparkProducerTest.java      | 217 +++++++++++++++++++
 .../camel-spark/src/test/resources/cars.json    |   2 +
 components/camel-spark/testrdd.txt              |  17 ++
 components/pom.xml                              |   1 +
 22 files changed, 1064 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/.gitignore
----------------------------------------------------------------------
diff --git a/components/camel-spark/.gitignore 
b/components/camel-spark/.gitignore
new file mode 100644
index 0000000..3d55803
--- /dev/null
+++ b/components/camel-spark/.gitignore
@@ -0,0 +1 @@
+metastore_db
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-spark/pom.xml b/components/camel-spark/pom.xml
new file mode 100644
index 0000000..2551f22
--- /dev/null
+++ b/components/camel-spark/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>components</artifactId>
+        <groupId>org.apache.camel</groupId>
+        <version>2.17-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-spark</artifactId>
+    <packaging>bundle</packaging>
+    <name>Camel :: Apache Spark</name>
+    <description>Camel Apache Spark Support</description>
+
+    <properties>
+        
<camel.osgi.export.pkg>org.apache.camel.component.spark.*</camel.osgi.export.pkg>
+        
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=spark</camel.osgi.export.service>
+
+        <!-- Spark 1.5.1 requires Jackson 2.4.4 -->
+        <jackson-version>2.4.4</jackson-version>
+
+        <google-truth-version>0.27</google-truth-version>
+        <hadoop.version>2.2.0</hadoop.version>
+        <mongo-hadoop-core.version>1.4.2</mongo-hadoop-core.version>
+        <spark-version>1.5.1</spark-version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+            <version>${spark-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.11</artifactId>
+            <version>${spark-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <!-- Optional -->
+        <dependency>
+            <groupId>org.mongodb.mongo-hadoop</groupId>
+            <artifactId>mongo-hadoop-core</artifactId>
+            <version>${mongo-hadoop-core.version}</version>
+            <optional>true</optional>
+        </dependency>
+
+        <!-- Testing -->
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.truth</groupId>
+            <artifactId>truth</artifactId>
+            <version>${google-truth-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameCallback.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameCallback.java
new file mode 100644
index 0000000..2ead4bc
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import org.apache.spark.sql.DataFrame;
+
+/**
+ * Generic block of code with parameters which can be executed against Spark 
Data Frames and return results.
+ *
+ * @param <T> results type
+ */
+public interface DataFrameCallback<T> {
+
+    T onDataFrame(DataFrame dataFrame, Object... payloads);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameSparkProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameSparkProducer.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameSparkProducer.java
new file mode 100644
index 0000000..0d40695
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameSparkProducer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.DataFrame;
+
+import static 
org.apache.camel.component.spark.SparkConstants.SPARK_DATAFRAME_CALLBACK_HEADER;
+import static 
org.apache.camel.component.spark.SparkConstants.SPARK_DATAFRAME_HEADER;
+import static org.apache.camel.component.spark.SparkConstants.SPARK_RDD_HEADER;
+
+public class DataFrameSparkProducer extends DefaultProducer {
+
+    public DataFrameSparkProducer(SparkEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        DataFrame dataFrame = resolveDataFrame(exchange);
+        DataFrameCallback dataFrameCallback = 
resolveDataFrameCallback(exchange);
+        Object body = exchange.getIn().getBody();
+        Object result = body instanceof List ? 
dataFrameCallback.onDataFrame(dataFrame, ((List) body).toArray(new Object[0])) 
: dataFrameCallback.onDataFrame(dataFrame, body);
+        collectResults(exchange, result);
+    }
+
+    @Override
+    public SparkEndpoint getEndpoint() {
+        return (SparkEndpoint) super.getEndpoint();
+    }
+
+    // Helpers
+
+    protected void collectResults(Exchange exchange, Object result) {
+        if (result instanceof JavaRDD) {
+            JavaRDD rddResults = (JavaRDD) result;
+            if (getEndpoint().isCollect()) {
+                exchange.getIn().setBody(rddResults.collect());
+            } else {
+                exchange.getIn().setBody(result);
+                exchange.getIn().setHeader(SPARK_RDD_HEADER, result);
+            }
+        } else {
+            exchange.getIn().setBody(result);
+        }
+    }
+
+    protected DataFrame resolveDataFrame(Exchange exchange) {
+        if (exchange.getIn().getHeader(SPARK_DATAFRAME_HEADER) != null) {
+            return (DataFrame) 
exchange.getIn().getHeader(SPARK_DATAFRAME_HEADER);
+        } else if (getEndpoint().getDataFrame() != null) {
+            return getEndpoint().getDataFrame();
+        } else {
+            throw new IllegalStateException("No Data Frame defined.");
+        }
+    }
+
+    protected DataFrameCallback resolveDataFrameCallback(Exchange exchange) {
+        if (exchange.getIn().getHeader(SPARK_DATAFRAME_CALLBACK_HEADER) != 
null) {
+            return  (DataFrameCallback) 
exchange.getIn().getHeader(SPARK_DATAFRAME_CALLBACK_HEADER);
+        } else if (getEndpoint().getDataFrameCallback() != null) {
+            return getEndpoint().getDataFrameCallback();
+        } else {
+            throw new IllegalStateException("Cannot resolve Data Frame.");
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
new file mode 100644
index 0000000..b471e39
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+public enum EndpointType {
+
+    rdd, dataframe
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddCallback.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddCallback.java
new file mode 100644
index 0000000..ed514b6
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import org.apache.spark.api.java.AbstractJavaRDDLike;
+
+/**
+ * Generic block of code with parameters which can be executed against RDD and 
return results.
+ *
+ * @param <T> results type
+ */
+public interface RddCallback<T> {
+
+    T onRdd(AbstractJavaRDDLike rdd, Object... payloads);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddSparkProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddSparkProducer.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddSparkProducer.java
new file mode 100644
index 0000000..1977bed
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddSparkProducer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.spark.api.java.AbstractJavaRDDLike;
+import org.apache.spark.api.java.JavaRDD;
+
+import static 
org.apache.camel.component.spark.SparkConstants.SPARK_RDD_CALLBACK_HEADER;
+import static org.apache.camel.component.spark.SparkConstants.SPARK_RDD_HEADER;
+
+public class RddSparkProducer extends DefaultProducer {
+
+    public RddSparkProducer(SparkEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AbstractJavaRDDLike rdd = resolveRdd(exchange);
+        RddCallback rddCallback = resolveRddCallback(exchange);
+        Object body = exchange.getIn().getBody();
+        Object result = body instanceof List ? rddCallback.onRdd(rdd, ((List) 
body).toArray(new Object[0])) : rddCallback.onRdd(rdd, body);
+        collectResults(exchange, result);
+    }
+
+    @Override
+    public SparkEndpoint getEndpoint() {
+        return (SparkEndpoint) super.getEndpoint();
+    }
+
+    // Helpers
+
+    protected void collectResults(Exchange exchange, Object result) {
+        if (result instanceof JavaRDD) {
+            JavaRDD rddResults = (JavaRDD) result;
+            if (getEndpoint().isCollect()) {
+                exchange.getIn().setBody(rddResults.collect());
+            } else {
+                exchange.getIn().setBody(result);
+                exchange.getIn().setHeader(SPARK_RDD_HEADER, result);
+            }
+        } else {
+            exchange.getIn().setBody(result);
+        }
+    }
+
+    protected AbstractJavaRDDLike resolveRdd(Exchange exchange) {
+        if (exchange.getIn().getHeader(SPARK_RDD_HEADER) != null) {
+            return (JavaRDD) exchange.getIn().getHeader(SPARK_RDD_HEADER);
+        } else if (getEndpoint().getRdd() != null) {
+            return getEndpoint().getRdd();
+        } else {
+            throw new IllegalStateException("No RDD defined.");
+        }
+    }
+
+    protected RddCallback resolveRddCallback(Exchange exchange) {
+        if (exchange.getIn().getHeader(SPARK_RDD_CALLBACK_HEADER) != null) {
+            return  (RddCallback) 
exchange.getIn().getHeader(SPARK_RDD_CALLBACK_HEADER);
+        } else if (getEndpoint().getRddCallback() != null) {
+            return getEndpoint().getRddCallback();
+        } else {
+            throw new IllegalStateException("Cannot resolve RDD callback.");
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkComponent.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkComponent.java
new file mode 100644
index 0000000..08c34d6
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkComponent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+
+public class SparkComponent extends UriEndpointComponent {
+
+    public SparkComponent() {
+        super(SparkEndpoint.class);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        return new SparkEndpoint(uri, this, EndpointType.valueOf(remaining));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java
new file mode 100644
index 0000000..5f96ee8
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+public final class SparkConstants {
+
+    public static final String SPARK_RDD_HEADER = "CAMEL_SPARK_RDD";
+
+    public static final String SPARK_RDD_CALLBACK_HEADER = 
"CAMEL_SPARK_RDD_CALLBACK";
+
+    public static final String SPARK_DATAFRAME_HEADER = 
"CAMEL_SPARK_DATAFRAME";
+
+    public static final String SPARK_DATAFRAME_CALLBACK_HEADER = 
"CAMEL_SPARK_DATAFRAME_CALLBACK";
+
+    private SparkConstants() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
new file mode 100644
index 0000000..8a5247b
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.spark.api.java.AbstractJavaRDDLike;
+import org.apache.spark.sql.DataFrame;
+
+public class SparkEndpoint extends DefaultEndpoint {
+
+    // Endpoint collaborators
+
+    private AbstractJavaRDDLike rdd;
+
+    private RddCallback rddCallback;
+
+    private DataFrame dataFrame;
+
+    private DataFrameCallback dataFrameCallback;
+
+    // Endpoint configuration
+
+    private final EndpointType endpointType;
+
+    private boolean collect = true;
+
+    // Constructors
+
+    public SparkEndpoint(String endpointUri, SparkComponent component, 
EndpointType endpointType) {
+        super(endpointUri, component);
+        this.endpointType = endpointType;
+    }
+
+    // Overridden
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return endpointType == EndpointType.rdd ? new RddSparkProducer(this) : 
new DataFrameSparkProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new UnsupportedOperationException("Spark component supports 
producer endpoints only.");
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    // Setters & getters
+
+
+    @Override
+    public SparkComponent getComponent() {
+        return (SparkComponent) super.getComponent();
+    }
+
+    public AbstractJavaRDDLike getRdd() {
+        return rdd;
+    }
+
+    public void setRdd(AbstractJavaRDDLike rdd) {
+        this.rdd = rdd;
+    }
+
+    public RddCallback getRddCallback() {
+        return rddCallback;
+    }
+
+    public void setRddCallback(RddCallback rddCallback) {
+        this.rddCallback = rddCallback;
+    }
+
+    public DataFrame getDataFrame() {
+        return dataFrame;
+    }
+
+    public void setDataFrame(DataFrame dataFrame) {
+        this.dataFrame = dataFrame;
+    }
+
+    public DataFrameCallback getDataFrameCallback() {
+        return dataFrameCallback;
+    }
+
+    public void setDataFrameCallback(DataFrameCallback dataFrameCallback) {
+        this.dataFrameCallback = dataFrameCallback;
+    }
+
+    public boolean isCollect() {
+        return collect;
+    }
+
+    public void setCollect(boolean collect) {
+        this.collect = collect;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkMongos.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkMongos.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkMongos.java
new file mode 100644
index 0000000..46ebb15
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkMongos.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import com.mongodb.hadoop.MongoInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.bson.BSONObject;
+
+public final class SparkMongos {
+
+    private SparkMongos() {
+    }
+
+    public static JavaPairRDD<Object, BSONObject> mongoRdd(JavaSparkContext 
sparkContext, String mongoHost, long mongoPort, String db, String collection) {
+        Configuration mongodbConfig = new Configuration();
+        mongodbConfig.set("mongo.job.input.format", 
"com.mongodb.hadoop.MongoInputFormat");
+        mongodbConfig.set("mongo.input.uri", 
String.format("mongodb://%s:%d/%s.%s", mongoHost, mongoPort, db, collection));
+        return sparkContext.newAPIHadoopRDD(mongodbConfig, 
MongoInputFormat.class, Object.class, BSONObject.class);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java
new file mode 100644
index 0000000..4a88e8e
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+public enum  SparkTransformation {
+
+    FILTER, MAP
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/Sparks.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/Sparks.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/Sparks.java
new file mode 100644
index 0000000..6021e0c
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/Sparks.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+public final class Sparks {
+
+    private Sparks() {
+    }
+
+    public static JavaSparkContext createLocalSparkContext() {
+        return new JavaSparkContext("local[*]", "local-camel-spark-context");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java
new file mode 100644
index 0000000..cb97c81
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import org.apache.camel.CamelContext;
+import org.apache.spark.api.java.AbstractJavaRDDLike;
+
+public abstract class TypedRddCallback<T> implements RddCallback<T> {
+
+    private final CamelContext camelContext;
+
+    private final Class[] payloadsTypes;
+
+    public TypedRddCallback(CamelContext camelContext, Class[] payloadsTypes) {
+        this.camelContext = camelContext;
+        this.payloadsTypes = payloadsTypes;
+    }
+
+    @Override
+    public T onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+        for (int i = 0; i < payloads.length; i++) {
+            payloads[i] = 
camelContext.getTypeConverter().convertTo(payloadsTypes[i], payloads[i]);
+        }
+        return doOnRdd(rdd, payloads);
+    }
+
+    public abstract T doOnRdd(AbstractJavaRDDLike rdd, Object... payloads);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/VoidRddCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/VoidRddCallback.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/VoidRddCallback.java
new file mode 100644
index 0000000..24e62bd
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/VoidRddCallback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import org.apache.spark.api.java.AbstractJavaRDDLike;
+
+public abstract class VoidRddCallback implements RddCallback<Void> {
+
+    public abstract void doOnRdd(AbstractJavaRDDLike rdd, Object... payloads);
+
+    @Override
+    public Void onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+        doOnRdd(rdd, payloads);
+        return null;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
new file mode 100644
index 0000000..8a988f4
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark.annotations;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+
+import org.apache.spark.api.java.AbstractJavaRDDLike;
+
+import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation;
+
+public final class AnnotatedRddCallback {
+
+    private AnnotatedRddCallback() {
+    }
+
+    public static org.apache.camel.component.spark.RddCallback 
annotatedRddCallback(final Object callback) {
+        final List<Method> rddCallbacks = 
findMethodsWithAnnotation(callback.getClass(), RddCallback.class);
+        if (rddCallbacks.size() > 0) {
+            return new org.apache.camel.component.spark.RddCallback() {
+                @Override
+                public Object onRdd(AbstractJavaRDDLike rdd, Object... 
payloads) {
+                    try {
+                        List<Object> arguments = new 
ArrayList<>(payloads.length + 1);
+                        arguments.add(rdd);
+                        arguments.addAll(asList(payloads));
+                        if (arguments.get(1) == null) {
+                            arguments.remove(1);
+                        }
+
+                        Method callbackMethod = rddCallbacks.get(0);
+                        callbackMethod.setAccessible(true);
+                        return callbackMethod.invoke(callback, 
arguments.toArray(new Object[arguments.size()]));
+                    } catch (IllegalAccessException | 
InvocationTargetException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            };
+        }
+        throw new UnsupportedOperationException("Can't find methods annotated 
with @Rdd.");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/RddCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/RddCallback.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/RddCallback.java
new file mode 100644
index 0000000..61dcae4
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/RddCallback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.PARAMETER })
+@Inherited
+public @interface RddCallback {
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/resources/META-INF/services/org/apache/camel/component/spark
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/resources/META-INF/services/org/apache/camel/component/spark
 
b/components/camel-spark/src/main/resources/META-INF/services/org/apache/camel/component/spark
new file mode 100644
index 0000000..db7a2dd
--- /dev/null
+++ 
b/components/camel-spark/src/main/resources/META-INF/services/org/apache/camel/component/spark
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.spark.SparkComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
 
b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
new file mode 100644
index 0000000..4b2ac18
--- /dev/null
+++ 
b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import java.io.File;
+import java.io.IOException;
+
+import static java.util.Arrays.asList;
+
+import com.google.common.truth.Truth;
+import org.apache.camel.component.spark.annotations.RddCallback;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.spark.api.java.AbstractJavaRDDLike;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.hive.HiveContext;
+import org.junit.Test;
+
+import static 
org.apache.camel.component.spark.SparkConstants.SPARK_DATAFRAME_CALLBACK_HEADER;
+import static 
org.apache.camel.component.spark.SparkConstants.SPARK_RDD_CALLBACK_HEADER;
+import static org.apache.camel.component.spark.Sparks.createLocalSparkContext;
+import static 
org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
+
+public class SparkProducerTest extends CamelTestSupport {
+
+    // Fixtures
+
+    static JavaSparkContext sparkContext = createLocalSparkContext();
+
+    static HiveContext hiveContext = new HiveContext(sparkContext.sc());
+
+    String sparkUri = "spark:rdd?rdd=#pomRdd";
+
+    String sparkDataFrameUri = "spark:dataframe?dataFrame=#jsonCars";
+
+    // Routes fixtures
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+
+        registry.bind("pomRdd", sparkContext.textFile("testrdd.txt"));
+
+        DataFrame jsonCars = 
hiveContext.read().json("src/test/resources/cars.json");
+        jsonCars.registerTempTable("cars");
+        registry.bind("jsonCars", jsonCars);
+
+        registry.bind("countLinesTransformation", new 
org.apache.camel.component.spark.RddCallback() {
+            @Override
+            public Object onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+                return rdd.count();
+            }
+        });
+        return registry;
+    }
+
+    // Tests
+
+    @Test
+    public void shouldExecuteRddCallback() {
+        long pomLinesCount = template.requestBodyAndHeader(sparkUri, null, 
SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() {
+            @Override
+            public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+                return rdd.count();
+            }
+        }, Long.class);
+        Truth.assertThat(pomLinesCount).isEqualTo(17);
+    }
+
+    @Test
+    public void shouldExecuteRddCallbackWithSinglePayload() {
+        long pomLinesCount = template.requestBodyAndHeader(sparkUri, 10, 
SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() {
+            @Override
+            public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+                return rdd.count() * (int) payloads[0];
+            }
+        }, Long.class);
+        Truth.assertThat(pomLinesCount).isEqualTo(170);
+    }
+
+    @Test
+    public void shouldExecuteRddCallbackWithPayloads() {
+        long pomLinesCount = template.requestBodyAndHeader(sparkUri, 
asList(10, 10), SPARK_RDD_CALLBACK_HEADER, new 
org.apache.camel.component.spark.RddCallback() {
+            @Override
+            public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+                return rdd.count() * (int) payloads[0] * (int) payloads[1];
+            }
+        }, Long.class);
+        Truth.assertThat(pomLinesCount).isEqualTo(1700);
+    }
+
+    @Test
+    public void shouldExecuteRddCallbackWithTypedPayloads() {
+        TypedRddCallback rddCallback = new TypedRddCallback<Long>(context, new 
Class[]{int.class, int.class}) {
+            @Override
+            public Long doOnRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+                return rdd.count() * (int) payloads[0] * (int) payloads[1];
+            }
+        };
+        long pomLinesCount = template.requestBodyAndHeader(sparkUri, 
asList("10", "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class);
+        Truth.assertThat(pomLinesCount).isEqualTo(1700);
+    }
+
+    @Test
+    public void shouldUseTransformationFromRegistry() {
+        long pomLinesCount = template.requestBody(sparkUri + 
"&rddCallback=#countLinesTransformation", null, Long.class);
+        Truth.assertThat(pomLinesCount).isGreaterThan(0L);
+    }
+
+    @Test
+    public void shouldExecuteVoidCallback() throws IOException {
+        // Given
+        final File output = File.createTempFile("camel", "spark");
+        output.delete();
+
+        // When
+        template.sendBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, 
new VoidRddCallback() {
+            @Override
+            public void doOnRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+                rdd.saveAsTextFile(output.getAbsolutePath());
+            }
+        });
+
+        // Then
+        Truth.assertThat(output.length()).isGreaterThan(0L);
+    }
+
+    @Test
+    public void shouldExecuteAnnotatedCallback() {
+        org.apache.camel.component.spark.RddCallback rddCallback = 
annotatedRddCallback(new Object() {
+            @RddCallback
+            long countLines(JavaRDD<String> textFile) {
+                return textFile.count();
+            }
+        });
+        long pomLinesCount = template.requestBodyAndHeader(sparkUri, null, 
SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class);
+        Truth.assertThat(pomLinesCount).isEqualTo(17);
+    }
+
+    @Test
+    public void shouldExecuteAnnotatedVoidCallback() throws IOException {
+        // Given
+        final File output = File.createTempFile("camel", "spark");
+        output.delete();
+        org.apache.camel.component.spark.RddCallback rddCallback = 
annotatedRddCallback(new Object() {
+            @RddCallback
+            void countLines(JavaRDD<String> textFile) {
+                textFile.saveAsTextFile(output.getAbsolutePath());
+            }
+        });
+
+        // When
+        template.sendBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, 
rddCallback);
+
+
+            // Then
+        Truth.assertThat(output.length()).isGreaterThan(0L);
+    }
+
+    // Hive tests
+
+    @Test
+    public void shouldExecuteHiveQuery() {
+        DataFrameCallback callback = new DataFrameCallback<Long>() {
+            @Override
+            public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
+                return hiveContext.sql("SELECT * FROM cars").count();
+            }
+        };
+        long tablesCount = template.requestBodyAndHeader(sparkDataFrameUri, 
null, SPARK_DATAFRAME_CALLBACK_HEADER, callback, Long.class);
+        Truth.assertThat(tablesCount).isEqualTo(2);
+    }
+
+    // Data frames tests
+
+    @Test
+    public void shouldCountFrame() {
+        DataFrameCallback callback = new DataFrameCallback<Long>() {
+            @Override
+            public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
+                return dataFrame.count();
+            }
+        };
+        long tablesCount = template.requestBodyAndHeader(sparkDataFrameUri, 
null, SPARK_DATAFRAME_CALLBACK_HEADER, callback, Long.class);
+        Truth.assertThat(tablesCount).isEqualTo(2);
+    }
+
+    @Test
+    public void shouldExecuteConditionalFrameCount() {
+        DataFrameCallback callback = new DataFrameCallback<Long>() {
+            @Override
+            public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
+                String model = (String) payloads[0];
+                return 
dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
+            }
+        };
+        long tablesCount = template.requestBodyAndHeader(sparkDataFrameUri, 
"Micra", SPARK_DATAFRAME_CALLBACK_HEADER, callback, Long.class);
+        Truth.assertThat(tablesCount).isEqualTo(1);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/test/resources/cars.json
----------------------------------------------------------------------
diff --git a/components/camel-spark/src/test/resources/cars.json 
b/components/camel-spark/src/test/resources/cars.json
new file mode 100644
index 0000000..27d22de
--- /dev/null
+++ b/components/camel-spark/src/test/resources/cars.json
@@ -0,0 +1,2 @@
+{"brand": "nissan", "model": "X-trail"}
+{"brand": "nissan", "model": "Micra"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/testrdd.txt
----------------------------------------------------------------------
diff --git a/components/camel-spark/testrdd.txt 
b/components/camel-spark/testrdd.txt
new file mode 100644
index 0000000..6a1fa8e
--- /dev/null
+++ b/components/camel-spark/testrdd.txt
@@ -0,0 +1,17 @@
+# Licensed to the Rhiot under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+foo bar
+baz qux
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index 8c0f342..fe40980 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -209,6 +209,7 @@
     <module>camel-slack</module>
     <module>camel-soap</module>
     <module>camel-solr</module>
+    <module>camel-spark</module>
     <module>camel-spark-rest</module>
     <module>camel-splunk</module>
     <module>camel-spring-batch</module>

Reply via email to