ignite-948 Add Java API for Ignite RDD
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4d36d123 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4d36d123 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4d36d123 Branch: refs/heads/ignite-1009-v4 Commit: 4d36d12361b78aa79517addce2a33fd772a0201e Parents: ac9dd30 Author: agura <ag...@gridgain.com> Authored: Tue Jun 2 01:09:17 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Thu Jun 4 22:15:42 2015 +0300 ---------------------------------------------------------------------- examples/config/example-ignite.xml | 4 +- modules/spark/pom.xml | 14 + .../spark/examples/java/ColocationTest.java | 89 ++++++ .../examples/java/ExampleConfiguration.java | 31 ++ .../examples/java/IgniteProcessExample.java | 80 +++++ .../spark/examples/java/IgniteStoreExample.java | 68 +++++ .../spark/examples/java/package-info.java | 21 ++ .../org/apache/ignite/spark/IgniteRDD.scala | 10 +- .../apache/ignite/spark/JavaIgniteContext.scala | 63 ++++ .../org/apache/ignite/spark/JavaIgniteRDD.scala | 99 ++++++ .../ignite/spark/examples/ColocationTest.scala | 5 +- .../spark/impl/JavaIgniteAbstractRDD.scala | 34 +++ .../ignite/spark/JavaIgniteRDDSelfTest.java | 298 +++++++++++++++++++ parent/pom.xml | 4 + 14 files changed, 811 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/examples/config/example-ignite.xml ---------------------------------------------------------------------- diff --git a/examples/config/example-ignite.xml b/examples/config/example-ignite.xml index e746e59..dcb2ba8 100644 --- a/examples/config/example-ignite.xml +++ b/examples/config/example-ignite.xml @@ -30,14 +30,16 @@ http://www.springframework.org/schema/util/spring-util.xsd"> <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> +<!-- <property name="peerClassLoadingEnabled" value="true"/> <property name="marshaller"> <bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller"> - <!-- Set to false to allow non-serializable objects in examples, default is true. --> + <!– Set to false to allow non-serializable objects in examples, default is true. –> <property name="requireSerializable" value="false"/> </bean> </property> +--> <!-- Enable task execution events for examples. --> <property name="includeEventTypes"> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index c22a52b..8900a10 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -87,6 +87,20 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java new file mode 100644 index 0000000..20d6e88 --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples.java; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spark.*; +import org.apache.spark.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.*; + +import scala.Tuple2; + +import java.util.*; + +/** + * Colocation test example. + */ +public class ColocationTest { + /** Keys count. */ + private static final int KEYS_CNT = 10000; + + /** To pair function. */ + private static final IgniteClosure<Integer, Tuple2<Integer, Integer>> TO_PAIR_F = + new IgniteClosure<Integer, Tuple2<Integer, Integer>>() { + @Override public Tuple2<Integer, Integer> apply(Integer i) { + return new Tuple2<>(i, i); + } + }; + + /** To value function. */ + private static final Function<Tuple2<Integer, Integer>, Integer> TO_VALUE_F = + new Function<Tuple2<Integer, Integer>, Integer>() { + /** {@inheritDoc} */ + @Override public Integer call(Tuple2<Integer, Integer> t) throws Exception { + return t._2(); + } + }; + + /** Sum function. */ + private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() { + public Integer call(Integer x, Integer y) { + return x + y; + } + }; + + /** + * @param args Args. + */ + public static void main(String[] args) { + SparkConf conf = new SparkConf(); + + conf.setAppName("Colocation test"); + + JavaSparkContext sc = new JavaSparkContext(conf); + + JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration()); + + JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned"); + + List<Integer> seq = F.range(0, KEYS_CNT + 1); + + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(F.transformList(seq, TO_PAIR_F), 48); + + cache.savePairs(rdd); + + int sum = (KEYS_CNT * KEYS_CNT - KEYS_CNT) / 2; + + // Execute parallel sum. + System.out.println("Local sum: " + sum); + + System.out.println("Distributed sum: " + cache.map(TO_VALUE_F).fold(0, SUM_F)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java new file mode 100644 index 0000000..5d769f2 --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples.java; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; + +/** + * Ignite example configuration provider. + */ +public class ExampleConfiguration implements IgniteOutClosure<IgniteConfiguration> { + /** {@inheritDoc} */ + @Override public IgniteConfiguration apply() { + return org.apache.ignite.spark.examples.ExampleConfiguration.configuration(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java new file mode 100644 index 0000000..8994355 --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples.java; + +import org.apache.ignite.spark.*; +import org.apache.spark.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.*; + +import scala.*; + +import java.lang.Boolean; + +/** + * Ignite process example. + */ +public class IgniteProcessExample { + /** Filter function. */ + private static final Function<Tuple2<Object, String>, Boolean> FILTER_F = + new Function<Tuple2<Object, String>, Boolean>() { + @Override public Boolean call(Tuple2<Object, String> t) throws Exception { + System.out.println("Analyzing line: " + t._2()); + + return t._2().contains("Ignite"); + } + }; + + /** To value function. */ + private static final Function<Tuple2<Object, String>, String> TO_VALUE_F = + new Function<Tuple2<Object, String>, String>() { + @Override public String call(Tuple2<Object, String> t) throws Exception { + return t._2(); + } + }; + + /** + * @param args Args. + */ + public static void main(String[] args) { + SparkConf conf = new SparkConf(); + + conf.setAppName("Ignite processing example"); + + JavaSparkContext sc = new JavaSparkContext(conf); + + JavaIgniteContext<Object, String> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration()); + + // Search for lines containing "Ignite". + JavaIgniteRDD<Object, String> scanRdd = ignite.fromCache("partitioned"); + + JavaRDD<String> processedRdd = scanRdd.filter(FILTER_F).map(TO_VALUE_F); + + // Create a new cache for results. + JavaIgniteRDD<Object, String> results = ignite.fromCache("results"); + + results.saveValues(processedRdd); + + // SQL query + ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect(); + + // SQL fields query + DataFrame df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java new file mode 100644 index 0000000..24ae77f --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples.java; + +import org.apache.ignite.spark.*; +import org.apache.spark.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.*; + +import scala.*; + +import java.lang.Boolean; + +/** + * Ignite store example. + */ +public class IgniteStoreExample { + /** Predicate. */ + private static final Function<String, Boolean> PREDICATE = new Function<String, Boolean>() { + @Override public Boolean call(String s) throws Exception { + System.out.println("Read line: " + s); + + return s.contains("Ignite"); + } + }; + + /** To pair function. */ + private static final PairFunction<String, String, String> TO_PAIR_F = new PairFunction<String, String, String>() { + @Override public Tuple2<String, String> call(String s) throws Exception { + return new Tuple2<>(s, s); + } + }; + + /** + * @param args Args. + */ + public static void main(String[] args) { + SparkConf conf = new SparkConf(); + + conf.setAppName("Ignite processing example"); + + JavaSparkContext sc = new JavaSparkContext(conf); + + JavaIgniteContext<String, String> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration()); + + JavaRDD<String> lines = sc.textFile(args[0]).filter(PREDICATE); + + ignite.fromCache("partitioned").saveValues(lines); + + ignite.fromCache("partitioned").savePairs(lines.mapToPair(TO_PAIR_F)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java new file mode 100644 index 0000000..e3243bf --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Demonstrates usage of Ignite and Spark from Java. + */ +package org.apache.ignite.spark.examples.java; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 0b8e845..742d7ee 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -43,9 +43,9 @@ import scala.collection.JavaConversions._ * @tparam V Value type. */ class IgniteRDD[K, V] ( - ic: IgniteContext[K, V], - cacheName: String, - cacheCfg: CacheConfiguration[K, V] + val ic: IgniteContext[K, V], + val cacheName: String, + val cacheCfg: CacheConfiguration[K, V] ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) { /** * Computes iterator based on given partition. @@ -73,7 +73,7 @@ class IgniteRDD[K, V] ( * * @return Partitions. */ - override protected def getPartitions: Array[Partition] = { + override protected[spark] def getPartitions: Array[Partition] = { ensureCache() val parts = ic.ignite().affinity(cacheName).partitions() @@ -87,7 +87,7 @@ class IgniteRDD[K, V] ( * @param split Split partition. * @return */ - override protected def getPreferredLocations(split: Partition): Seq[String] = { + override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = { ensureCache() ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala new file mode 100644 index 0000000..e2d57bf --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.lang.IgniteOutClosure +import org.apache.spark.api.java.JavaSparkContext + +import scala.reflect.ClassTag + +/** + * Java-friendly Ignite context wrapper. + * + * @param sc Java Spark context. + * @param cfgF Configuration factory. + * @tparam K Key type. + * @tparam V Value type. + */ +class JavaIgniteContext[K, V]( + @scala.transient val sc: JavaSparkContext, + val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable { + + @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply()) + + def this(sc: JavaSparkContext, springUrl: String) { + this(sc, new IgniteOutClosure[IgniteConfiguration] { + override def apply() = IgnitionEx.loadConfiguration(springUrl).get1() + }) + } + + def fromCache(cacheName: String): JavaIgniteRDD[K, V] = + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null)) + + def fromCache(cacheCfg: CacheConfiguration[K, V]) = + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg)) + + def ignite(): Ignite = ic.ignite() + + def close() = ic.close() + + private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + implicit val ktag: ClassTag[K] = fakeClassTag + + implicit val vtag: ClassTag[V] = fakeClassTag +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala new file mode 100644 index 0000000..2e8702e --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import java.util + +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.{Partition, TaskContext} + +import scala.annotation.varargs +import scala.collection.JavaConversions._ +import scala.language.implicitConversions +import scala.reflect.ClassTag + +/** + * Java-friendly Ignite RDD wrapper. Represents Ignite cache as Java Spark RDD abstraction. + * + * @param rdd Ignite RDD instance. + * @tparam K Key type. + * @tparam V Value type. + */ +class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V]) + extends JavaPairRDD[K, V](rdd)(JavaIgniteRDD.fakeClassTag, JavaIgniteRDD.fakeClassTag) { + + override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) + + override val classTag: ClassTag[(K, V)] = JavaIgniteRDD.fakeClassTag + + /** + * Computes iterator based on given partition. + * + * @param part Partition to use. + * @param context Task context. + * @return Partition iterator. + */ + def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + rdd.compute(part, context) + } + + /** + * Gets partitions for the given cache RDD. + * + * @return Partitions. + */ + protected def getPartitions: java.util.List[Partition] = { + new util.ArrayList[Partition](rdd.getPartitions.toSeq) + } + + /** + * Gets preferred locations for the given partition. + * + * @param split Split partition. + * @return + */ + protected def getPreferredLocations(split: Partition): Seq[String] = { + rdd.getPreferredLocations(split) + } + + @varargs def objectSql(typeName: String, sql: String, args: Any*): JavaPairRDD[K, V] = + JavaPairRDD.fromRDD(rdd.objectSql(typeName, sql, args:_*)) + + @varargs def sql(sql: String, args: Any*): DataFrame = rdd.sql(sql, args:_*) + + def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd)) + + def savePairs(jrdd: JavaPairRDD[K, V]) = { + val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd) + + rdd.savePairs(rrdd) + } + + def clear(): Unit = rdd.clear() +} + +object JavaIgniteRDD { + implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] = + new JavaIgniteRDD[K, V](rdd) + + implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd + + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala index e1d3d8e..29587e4 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala @@ -17,16 +17,15 @@ package org.apache.ignite.spark.examples -import org.apache.ignite.configuration.IgniteConfiguration import org.apache.ignite.spark.IgniteContext -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkConf, SparkContext} object ColocationTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Colocation test") val sc = new SparkContext(conf) - val ignite = new IgniteContext[Int, Int](sc, () â new IgniteConfiguration()) + val ignite = new IgniteContext[Int, Int](sc, ExampleConfiguration.configuration _) // Search for lines containing "Ignite". val cache = ignite.fromCache("partitioned") http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala new file mode 100644 index 0000000..13bd3e8 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.IgniteCache +import org.apache.ignite.spark.IgniteRDD +import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike} + +abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V]) + extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { + + protected def ensureCache(): IgniteCache[K, V] = { + // Make sure to deploy the cache + if (rdd.cacheCfg != null) + rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg) + else + rdd.ic.ignite().getOrCreateCache(rdd.cacheName) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java new file mode 100644 index 0000000..e14abfc --- /dev/null +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.*; + +import scala.*; + +import java.util.*; + +/** + * Tests for {@link JavaIgniteRDD}. + */ +public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Keys count. */ + private static final int KEYS_CNT = 10000; + + /** Cache name. */ + private static final String PARTITIONED_CACHE_NAME = "partitioned"; + + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Sum function. */ + private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() { + public Integer call(Integer x, Integer y) { + return x + y; + } + }; + + /** To pair function. */ + private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() { + /** {@inheritDoc} */ + @Override public Tuple2<String, String> call(Integer i) { + return new Tuple2<>(String.valueOf(i), "val" + i); + } + }; + + /** (String, Integer); pair to Integer value function. */ + private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>(); + + /** (String, Entity) pair to Entity value function. */ + private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F = + new PairToValueFunction<>(); + + /** Integer to entity function. */ + private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F = + new PairFunction<Integer, String, Entity>() { + @Override public Tuple2<String, Entity> call(Integer i) throws Exception { + return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100)); + } + }; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + Ignition.stop("client", false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < GRID_CNT; i++) + Ignition.start(getConfiguration("grid-" + i, false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + for (int i = 0; i < GRID_CNT; i++) + Ignition.stop("grid-" + i, false); + } + + /** + * @throws Exception If failed. + */ + public void testStoreDataToIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + ic.fromCache(PARTITIONED_CACHE_NAME) + .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F)); + + Ignite ignite = Ignition.ignite("grid-0"); + + IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) { + String val = cache.get(String.valueOf(i)); + + assertNotNull("Value was not put to cache for key: " + i, val); + assertEquals("Invalid value stored for key: " + i, "val" + i, val); + } + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReadDataFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + Ignite ignite = Ignition.ignite("grid-0"); + + IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) + cache.put(String.valueOf(i), i); + + JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F); + + int sum = values.fold(0, SUM_F); + + int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT; + + assertEquals(expSum, sum); + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryObjectsFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); + + List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) + .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); + + assertEquals("Invalid result length", 1, res.size()); + assertEquals("Invalid result", 50, res.get(0).id()); + assertEquals("Invalid result", "name50", res.get(0).name()); + assertEquals("Invalid result", 5000, res.get(0).salary()); + assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count()); + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryFieldsFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); + + DataFrame df = + cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); + + df.printSchema(); + + Row[] res = df.collect(); + + assertEquals("Invalid result length", 1, res.length); + assertEquals("Invalid result", 50, res[0].get(0)); + assertEquals("Invalid result", "name50", res[0].get(1)); + assertEquals("Invalid result", 5000, res[0].get(2)); + + Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000)); + + DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp); + + df.printSchema(); + + Row[] res0 = df0.collect(); + + assertEquals("Invalid result length", 1, res0.length); + assertEquals("Invalid result", 50, res0[0].get(0)); + assertEquals("Invalid result", "name50", res0[0].get(1)); + assertEquals("Invalid result", 5000, res0[0].get(2)); + + assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count()); + } + finally { + sc.stop(); + } + + } + + /** + * @param gridName Grid name. + * @param client Client. + */ + private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration()); + + cfg.setClientMode(client); + + cfg.setGridName(gridName); + + return cfg; + } + + /** + * Creates cache configuration. + */ + private static CacheConfiguration<Object, Object> cacheConfiguration() { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setBackups(1); + + ccfg.setName(PARTITIONED_CACHE_NAME); + + ccfg.setIndexedTypes(String.class, Entity.class); + + return ccfg; + } + + /** + * Ignite configiration provider. + */ + static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> { + /** {@inheritDoc} */ + @Override public IgniteConfiguration apply() { + try { + return getConfiguration("client", true); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * @param <K> + * @param <V> + */ + static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> { + /** {@inheritDoc} */ + @Override public V call(Tuple2<K, V> t) throws Exception { + return t._2(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index a514e35..f5b73df 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -321,6 +321,10 @@ <title>Mesos Framework</title> <packages>org.apache.ignite.mesos*</packages> </group> + <group> + <title>Spark Integration</title> + <packages>org.apache.ignite.spark.examples.java</packages> + </group> </groups> <header> <![CDATA[