Repository: camel Updated Branches: refs/heads/master 1ee0008e7 -> b46392c25
[CAMEL-9385] Initial commit of Apache Spark component. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b46392c2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b46392c2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b46392c2 Branch: refs/heads/master Commit: b46392c25bd517cbb7d04e6dd7611d1634378a13 Parents: 1ee0008 Author: Henryk Konsek <hekon...@gmail.com> Authored: Fri Dec 4 12:00:33 2015 +0100 Committer: Henryk Konsek <hekon...@gmail.com> Committed: Fri Dec 4 12:00:33 2015 +0100 ---------------------------------------------------------------------- components/camel-spark/.gitignore | 1 + components/camel-spark/pom.xml | 116 ++++++++++ .../component/spark/DataFrameCallback.java | 30 +++ .../component/spark/DataFrameSparkProducer.java | 86 ++++++++ .../camel/component/spark/EndpointType.java | 23 ++ .../camel/component/spark/RddCallback.java | 30 +++ .../camel/component/spark/RddSparkProducer.java | 85 ++++++++ .../camel/component/spark/SparkComponent.java | 35 +++ .../camel/component/spark/SparkConstants.java | 32 +++ .../camel/component/spark/SparkEndpoint.java | 117 ++++++++++ .../camel/component/spark/SparkMongos.java | 37 ++++ .../component/spark/SparkTransformation.java | 23 ++ .../apache/camel/component/spark/Sparks.java | 30 +++ .../camel/component/spark/TypedRddCallback.java | 43 ++++ .../camel/component/spark/VoidRddCallback.java | 31 +++ .../spark/annotations/AnnotatedRddCallback.java | 61 ++++++ .../spark/annotations/RddCallback.java | 29 +++ .../services/org/apache/camel/component/spark | 18 ++ .../component/spark/SparkProducerTest.java | 217 +++++++++++++++++++ .../camel-spark/src/test/resources/cars.json | 2 + components/camel-spark/testrdd.txt | 17 ++ components/pom.xml | 1 + 22 files changed, 1064 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/.gitignore ---------------------------------------------------------------------- diff --git a/components/camel-spark/.gitignore b/components/camel-spark/.gitignore new file mode 100644 index 0000000..3d55803 --- /dev/null +++ b/components/camel-spark/.gitignore @@ -0,0 +1 @@ +metastore_db \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-spark/pom.xml b/components/camel-spark/pom.xml new file mode 100644 index 0000000..2551f22 --- /dev/null +++ b/components/camel-spark/pom.xml @@ -0,0 +1,116 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>components</artifactId> + <groupId>org.apache.camel</groupId> + <version>2.17-SNAPSHOT</version> + </parent> + + <artifactId>camel-spark</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Apache Spark</name> + <description>Camel Apache Spark Support</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.spark.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=spark</camel.osgi.export.service> + + <!-- Spark 1.5.1 requires Jackson 2.4.4 --> + <jackson-version>2.4.4</jackson-version> + + <google-truth-version>0.27</google-truth-version> + <hadoop.version>2.2.0</hadoop.version> + <mongo-hadoop-core.version>1.4.2</mongo-hadoop-core.version> + <spark-version>1.5.1</spark-version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>${spark-version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_2.11</artifactId> + <version>${spark-version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <!-- Optional --> + <dependency> + <groupId>org.mongodb.mongo-hadoop</groupId> + <artifactId>mongo-hadoop-core</artifactId> + <version>${mongo-hadoop-core.version}</version> + <optional>true</optional> + </dependency> + + <!-- Testing --> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>${logback-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.truth</groupId> + <artifactId>truth</artifactId> + <version>${google-truth-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameCallback.java new file mode 100644 index 0000000..2ead4bc --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameCallback.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import org.apache.spark.sql.DataFrame; + +/** + * Generic block of code with parameters which can be executed against Spark Data Frames and return results. + * + * @param <T> results type + */ +public interface DataFrameCallback<T> { + + T onDataFrame(DataFrame dataFrame, Object... payloads); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameSparkProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameSparkProducer.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameSparkProducer.java new file mode 100644 index 0000000..0d40695 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DataFrameSparkProducer.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.DataFrame; + +import static org.apache.camel.component.spark.SparkConstants.SPARK_DATAFRAME_CALLBACK_HEADER; +import static org.apache.camel.component.spark.SparkConstants.SPARK_DATAFRAME_HEADER; +import static org.apache.camel.component.spark.SparkConstants.SPARK_RDD_HEADER; + +public class DataFrameSparkProducer extends DefaultProducer { + + public DataFrameSparkProducer(SparkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + DataFrame dataFrame = resolveDataFrame(exchange); + DataFrameCallback dataFrameCallback = resolveDataFrameCallback(exchange); + Object body = exchange.getIn().getBody(); + Object result = body instanceof List ? dataFrameCallback.onDataFrame(dataFrame, ((List) body).toArray(new Object[0])) : dataFrameCallback.onDataFrame(dataFrame, body); + collectResults(exchange, result); + } + + @Override + public SparkEndpoint getEndpoint() { + return (SparkEndpoint) super.getEndpoint(); + } + + // Helpers + + protected void collectResults(Exchange exchange, Object result) { + if (result instanceof JavaRDD) { + JavaRDD rddResults = (JavaRDD) result; + if (getEndpoint().isCollect()) { + exchange.getIn().setBody(rddResults.collect()); + } else { + exchange.getIn().setBody(result); + exchange.getIn().setHeader(SPARK_RDD_HEADER, result); + } + } else { + exchange.getIn().setBody(result); + } + } + + protected DataFrame resolveDataFrame(Exchange exchange) { + if (exchange.getIn().getHeader(SPARK_DATAFRAME_HEADER) != null) { + return (DataFrame) exchange.getIn().getHeader(SPARK_DATAFRAME_HEADER); + } else if (getEndpoint().getDataFrame() != null) { + return getEndpoint().getDataFrame(); + } else { + throw new IllegalStateException("No Data Frame defined."); + } + } + + protected DataFrameCallback resolveDataFrameCallback(Exchange exchange) { + if (exchange.getIn().getHeader(SPARK_DATAFRAME_CALLBACK_HEADER) != null) { + return (DataFrameCallback) exchange.getIn().getHeader(SPARK_DATAFRAME_CALLBACK_HEADER); + } else if (getEndpoint().getDataFrameCallback() != null) { + return getEndpoint().getDataFrameCallback(); + } else { + throw new IllegalStateException("Cannot resolve Data Frame."); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java new file mode 100644 index 0000000..b471e39 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +public enum EndpointType { + + rdd, dataframe + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddCallback.java new file mode 100644 index 0000000..ed514b6 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddCallback.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import org.apache.spark.api.java.AbstractJavaRDDLike; + +/** + * Generic block of code with parameters which can be executed against RDD and return results. + * + * @param <T> results type + */ +public interface RddCallback<T> { + + T onRdd(AbstractJavaRDDLike rdd, Object... payloads); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddSparkProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddSparkProducer.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddSparkProducer.java new file mode 100644 index 0000000..1977bed --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/RddSparkProducer.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.apache.spark.api.java.AbstractJavaRDDLike; +import org.apache.spark.api.java.JavaRDD; + +import static org.apache.camel.component.spark.SparkConstants.SPARK_RDD_CALLBACK_HEADER; +import static org.apache.camel.component.spark.SparkConstants.SPARK_RDD_HEADER; + +public class RddSparkProducer extends DefaultProducer { + + public RddSparkProducer(SparkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + AbstractJavaRDDLike rdd = resolveRdd(exchange); + RddCallback rddCallback = resolveRddCallback(exchange); + Object body = exchange.getIn().getBody(); + Object result = body instanceof List ? rddCallback.onRdd(rdd, ((List) body).toArray(new Object[0])) : rddCallback.onRdd(rdd, body); + collectResults(exchange, result); + } + + @Override + public SparkEndpoint getEndpoint() { + return (SparkEndpoint) super.getEndpoint(); + } + + // Helpers + + protected void collectResults(Exchange exchange, Object result) { + if (result instanceof JavaRDD) { + JavaRDD rddResults = (JavaRDD) result; + if (getEndpoint().isCollect()) { + exchange.getIn().setBody(rddResults.collect()); + } else { + exchange.getIn().setBody(result); + exchange.getIn().setHeader(SPARK_RDD_HEADER, result); + } + } else { + exchange.getIn().setBody(result); + } + } + + protected AbstractJavaRDDLike resolveRdd(Exchange exchange) { + if (exchange.getIn().getHeader(SPARK_RDD_HEADER) != null) { + return (JavaRDD) exchange.getIn().getHeader(SPARK_RDD_HEADER); + } else if (getEndpoint().getRdd() != null) { + return getEndpoint().getRdd(); + } else { + throw new IllegalStateException("No RDD defined."); + } + } + + protected RddCallback resolveRddCallback(Exchange exchange) { + if (exchange.getIn().getHeader(SPARK_RDD_CALLBACK_HEADER) != null) { + return (RddCallback) exchange.getIn().getHeader(SPARK_RDD_CALLBACK_HEADER); + } else if (getEndpoint().getRddCallback() != null) { + return getEndpoint().getRddCallback(); + } else { + throw new IllegalStateException("Cannot resolve RDD callback."); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkComponent.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkComponent.java new file mode 100644 index 0000000..08c34d6 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkComponent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; + +public class SparkComponent extends UriEndpointComponent { + + public SparkComponent() { + super(SparkEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new SparkEndpoint(uri, this, EndpointType.valueOf(remaining)); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java new file mode 100644 index 0000000..5f96ee8 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +public final class SparkConstants { + + public static final String SPARK_RDD_HEADER = "CAMEL_SPARK_RDD"; + + public static final String SPARK_RDD_CALLBACK_HEADER = "CAMEL_SPARK_RDD_CALLBACK"; + + public static final String SPARK_DATAFRAME_HEADER = "CAMEL_SPARK_DATAFRAME"; + + public static final String SPARK_DATAFRAME_CALLBACK_HEADER = "CAMEL_SPARK_DATAFRAME_CALLBACK"; + + private SparkConstants() { + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java new file mode 100644 index 0000000..8a5247b --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.spark.api.java.AbstractJavaRDDLike; +import org.apache.spark.sql.DataFrame; + +public class SparkEndpoint extends DefaultEndpoint { + + // Endpoint collaborators + + private AbstractJavaRDDLike rdd; + + private RddCallback rddCallback; + + private DataFrame dataFrame; + + private DataFrameCallback dataFrameCallback; + + // Endpoint configuration + + private final EndpointType endpointType; + + private boolean collect = true; + + // Constructors + + public SparkEndpoint(String endpointUri, SparkComponent component, EndpointType endpointType) { + super(endpointUri, component); + this.endpointType = endpointType; + } + + // Overridden + + @Override + public Producer createProducer() throws Exception { + return endpointType == EndpointType.rdd ? new RddSparkProducer(this) : new DataFrameSparkProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("Spark component supports producer endpoints only."); + } + + @Override + public boolean isSingleton() { + return true; + } + + // Setters & getters + + + @Override + public SparkComponent getComponent() { + return (SparkComponent) super.getComponent(); + } + + public AbstractJavaRDDLike getRdd() { + return rdd; + } + + public void setRdd(AbstractJavaRDDLike rdd) { + this.rdd = rdd; + } + + public RddCallback getRddCallback() { + return rddCallback; + } + + public void setRddCallback(RddCallback rddCallback) { + this.rddCallback = rddCallback; + } + + public DataFrame getDataFrame() { + return dataFrame; + } + + public void setDataFrame(DataFrame dataFrame) { + this.dataFrame = dataFrame; + } + + public DataFrameCallback getDataFrameCallback() { + return dataFrameCallback; + } + + public void setDataFrameCallback(DataFrameCallback dataFrameCallback) { + this.dataFrameCallback = dataFrameCallback; + } + + public boolean isCollect() { + return collect; + } + + public void setCollect(boolean collect) { + this.collect = collect; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkMongos.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkMongos.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkMongos.java new file mode 100644 index 0000000..46ebb15 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkMongos.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import com.mongodb.hadoop.MongoInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.bson.BSONObject; + +public final class SparkMongos { + + private SparkMongos() { + } + + public static JavaPairRDD<Object, BSONObject> mongoRdd(JavaSparkContext sparkContext, String mongoHost, long mongoPort, String db, String collection) { + Configuration mongodbConfig = new Configuration(); + mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); + mongodbConfig.set("mongo.input.uri", String.format("mongodb://%s:%d/%s.%s", mongoHost, mongoPort, db, collection)); + return sparkContext.newAPIHadoopRDD(mongodbConfig, MongoInputFormat.class, Object.class, BSONObject.class); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java new file mode 100644 index 0000000..4a88e8e --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkTransformation.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +public enum SparkTransformation { + + FILTER, MAP + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/Sparks.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/Sparks.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/Sparks.java new file mode 100644 index 0000000..6021e0c --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/Sparks.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import org.apache.spark.api.java.JavaSparkContext; + +public final class Sparks { + + private Sparks() { + } + + public static JavaSparkContext createLocalSparkContext() { + return new JavaSparkContext("local[*]", "local-camel-spark-context"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java new file mode 100644 index 0000000..cb97c81 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/TypedRddCallback.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import org.apache.camel.CamelContext; +import org.apache.spark.api.java.AbstractJavaRDDLike; + +public abstract class TypedRddCallback<T> implements RddCallback<T> { + + private final CamelContext camelContext; + + private final Class[] payloadsTypes; + + public TypedRddCallback(CamelContext camelContext, Class[] payloadsTypes) { + this.camelContext = camelContext; + this.payloadsTypes = payloadsTypes; + } + + @Override + public T onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + for (int i = 0; i < payloads.length; i++) { + payloads[i] = camelContext.getTypeConverter().convertTo(payloadsTypes[i], payloads[i]); + } + return doOnRdd(rdd, payloads); + } + + public abstract T doOnRdd(AbstractJavaRDDLike rdd, Object... payloads); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/VoidRddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/VoidRddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/VoidRddCallback.java new file mode 100644 index 0000000..24e62bd --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/VoidRddCallback.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import org.apache.spark.api.java.AbstractJavaRDDLike; + +public abstract class VoidRddCallback implements RddCallback<Void> { + + public abstract void doOnRdd(AbstractJavaRDDLike rdd, Object... payloads); + + @Override + public Void onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + doOnRdd(rdd, payloads); + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java new file mode 100644 index 0000000..8a988f4 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark.annotations; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Arrays.asList; + +import org.apache.spark.api.java.AbstractJavaRDDLike; + +import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation; + +public final class AnnotatedRddCallback { + + private AnnotatedRddCallback() { + } + + public static org.apache.camel.component.spark.RddCallback annotatedRddCallback(final Object callback) { + final List<Method> rddCallbacks = findMethodsWithAnnotation(callback.getClass(), RddCallback.class); + if (rddCallbacks.size() > 0) { + return new org.apache.camel.component.spark.RddCallback() { + @Override + public Object onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + try { + List<Object> arguments = new ArrayList<>(payloads.length + 1); + arguments.add(rdd); + arguments.addAll(asList(payloads)); + if (arguments.get(1) == null) { + arguments.remove(1); + } + + Method callbackMethod = rddCallbacks.get(0); + callbackMethod.setAccessible(true); + return callbackMethod.invoke(callback, arguments.toArray(new Object[arguments.size()])); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + throw new UnsupportedOperationException("Can't find methods annotated with @Rdd."); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/RddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/RddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/RddCallback.java new file mode 100644 index 0000000..61dcae4 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/RddCallback.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.PARAMETER }) +@Inherited +public @interface RddCallback { +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/main/resources/META-INF/services/org/apache/camel/component/spark ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/resources/META-INF/services/org/apache/camel/component/spark b/components/camel-spark/src/main/resources/META-INF/services/org/apache/camel/component/spark new file mode 100644 index 0000000..db7a2dd --- /dev/null +++ b/components/camel-spark/src/main/resources/META-INF/services/org/apache/camel/component/spark @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.spark.SparkComponent http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java new file mode 100644 index 0000000..4b2ac18 --- /dev/null +++ b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import java.io.File; +import java.io.IOException; + +import static java.util.Arrays.asList; + +import com.google.common.truth.Truth; +import org.apache.camel.component.spark.annotations.RddCallback; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.spark.api.java.AbstractJavaRDDLike; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.hive.HiveContext; +import org.junit.Test; + +import static org.apache.camel.component.spark.SparkConstants.SPARK_DATAFRAME_CALLBACK_HEADER; +import static org.apache.camel.component.spark.SparkConstants.SPARK_RDD_CALLBACK_HEADER; +import static org.apache.camel.component.spark.Sparks.createLocalSparkContext; +import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; + +public class SparkProducerTest extends CamelTestSupport { + + // Fixtures + + static JavaSparkContext sparkContext = createLocalSparkContext(); + + static HiveContext hiveContext = new HiveContext(sparkContext.sc()); + + String sparkUri = "spark:rdd?rdd=#pomRdd"; + + String sparkDataFrameUri = "spark:dataframe?dataFrame=#jsonCars"; + + // Routes fixtures + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + registry.bind("pomRdd", sparkContext.textFile("testrdd.txt")); + + DataFrame jsonCars = hiveContext.read().json("src/test/resources/cars.json"); + jsonCars.registerTempTable("cars"); + registry.bind("jsonCars", jsonCars); + + registry.bind("countLinesTransformation", new org.apache.camel.component.spark.RddCallback() { + @Override + public Object onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + return rdd.count(); + } + }); + return registry; + } + + // Tests + + @Test + public void shouldExecuteRddCallback() { + long pomLinesCount = template.requestBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { + @Override + public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + return rdd.count(); + } + }, Long.class); + Truth.assertThat(pomLinesCount).isEqualTo(17); + } + + @Test + public void shouldExecuteRddCallbackWithSinglePayload() { + long pomLinesCount = template.requestBodyAndHeader(sparkUri, 10, SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { + @Override + public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + return rdd.count() * (int) payloads[0]; + } + }, Long.class); + Truth.assertThat(pomLinesCount).isEqualTo(170); + } + + @Test + public void shouldExecuteRddCallbackWithPayloads() { + long pomLinesCount = template.requestBodyAndHeader(sparkUri, asList(10, 10), SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { + @Override + public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + return rdd.count() * (int) payloads[0] * (int) payloads[1]; + } + }, Long.class); + Truth.assertThat(pomLinesCount).isEqualTo(1700); + } + + @Test + public void shouldExecuteRddCallbackWithTypedPayloads() { + TypedRddCallback rddCallback = new TypedRddCallback<Long>(context, new Class[]{int.class, int.class}) { + @Override + public Long doOnRdd(AbstractJavaRDDLike rdd, Object... payloads) { + return rdd.count() * (int) payloads[0] * (int) payloads[1]; + } + }; + long pomLinesCount = template.requestBodyAndHeader(sparkUri, asList("10", "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); + Truth.assertThat(pomLinesCount).isEqualTo(1700); + } + + @Test + public void shouldUseTransformationFromRegistry() { + long pomLinesCount = template.requestBody(sparkUri + "&rddCallback=#countLinesTransformation", null, Long.class); + Truth.assertThat(pomLinesCount).isGreaterThan(0L); + } + + @Test + public void shouldExecuteVoidCallback() throws IOException { + // Given + final File output = File.createTempFile("camel", "spark"); + output.delete(); + + // When + template.sendBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, new VoidRddCallback() { + @Override + public void doOnRdd(AbstractJavaRDDLike rdd, Object... payloads) { + rdd.saveAsTextFile(output.getAbsolutePath()); + } + }); + + // Then + Truth.assertThat(output.length()).isGreaterThan(0L); + } + + @Test + public void shouldExecuteAnnotatedCallback() { + org.apache.camel.component.spark.RddCallback rddCallback = annotatedRddCallback(new Object() { + @RddCallback + long countLines(JavaRDD<String> textFile) { + return textFile.count(); + } + }); + long pomLinesCount = template.requestBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); + Truth.assertThat(pomLinesCount).isEqualTo(17); + } + + @Test + public void shouldExecuteAnnotatedVoidCallback() throws IOException { + // Given + final File output = File.createTempFile("camel", "spark"); + output.delete(); + org.apache.camel.component.spark.RddCallback rddCallback = annotatedRddCallback(new Object() { + @RddCallback + void countLines(JavaRDD<String> textFile) { + textFile.saveAsTextFile(output.getAbsolutePath()); + } + }); + + // When + template.sendBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, rddCallback); + + + // Then + Truth.assertThat(output.length()).isGreaterThan(0L); + } + + // Hive tests + + @Test + public void shouldExecuteHiveQuery() { + DataFrameCallback callback = new DataFrameCallback<Long>() { + @Override + public Long onDataFrame(DataFrame dataFrame, Object... payloads) { + return hiveContext.sql("SELECT * FROM cars").count(); + } + }; + long tablesCount = template.requestBodyAndHeader(sparkDataFrameUri, null, SPARK_DATAFRAME_CALLBACK_HEADER, callback, Long.class); + Truth.assertThat(tablesCount).isEqualTo(2); + } + + // Data frames tests + + @Test + public void shouldCountFrame() { + DataFrameCallback callback = new DataFrameCallback<Long>() { + @Override + public Long onDataFrame(DataFrame dataFrame, Object... payloads) { + return dataFrame.count(); + } + }; + long tablesCount = template.requestBodyAndHeader(sparkDataFrameUri, null, SPARK_DATAFRAME_CALLBACK_HEADER, callback, Long.class); + Truth.assertThat(tablesCount).isEqualTo(2); + } + + @Test + public void shouldExecuteConditionalFrameCount() { + DataFrameCallback callback = new DataFrameCallback<Long>() { + @Override + public Long onDataFrame(DataFrame dataFrame, Object... payloads) { + String model = (String) payloads[0]; + return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count(); + } + }; + long tablesCount = template.requestBodyAndHeader(sparkDataFrameUri, "Micra", SPARK_DATAFRAME_CALLBACK_HEADER, callback, Long.class); + Truth.assertThat(tablesCount).isEqualTo(1); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/src/test/resources/cars.json ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/test/resources/cars.json b/components/camel-spark/src/test/resources/cars.json new file mode 100644 index 0000000..27d22de --- /dev/null +++ b/components/camel-spark/src/test/resources/cars.json @@ -0,0 +1,2 @@ +{"brand": "nissan", "model": "X-trail"} +{"brand": "nissan", "model": "Micra"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/camel-spark/testrdd.txt ---------------------------------------------------------------------- diff --git a/components/camel-spark/testrdd.txt b/components/camel-spark/testrdd.txt new file mode 100644 index 0000000..6a1fa8e --- /dev/null +++ b/components/camel-spark/testrdd.txt @@ -0,0 +1,17 @@ +# Licensed to the Rhiot under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +foo bar +baz qux \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b46392c2/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index 8c0f342..fe40980 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -209,6 +209,7 @@ <module>camel-slack</module> <module>camel-soap</module> <module>camel-solr</module> + <module>camel-spark</module> <module>camel-spark-rest</module> <module>camel-splunk</module> <module>camel-spring-batch</module>