ignite-948 Add Java API for Ignite RDD
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29c6d28a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29c6d28a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29c6d28a Branch: refs/heads/ignite-948 Commit: 29c6d28a530a09b4724c7f3463dc193090d62de6 Parents: 2aa1ace Author: agura <ag...@gridgain.com> Authored: Tue Jun 2 01:09:17 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Wed Jun 3 18:36:00 2015 +0300 ---------------------------------------------------------------------- modules/spark/pom.xml | 14 ++ .../spark/examples/java/ColocationTest.java | 88 +++++++ .../examples/java/IgniteProcessExample.java | 82 +++++++ .../spark/examples/java/IgniteStoreExample.java | 72 ++++++ .../spark/examples/java/package-info.java | 21 ++ .../org/apache/ignite/spark/IgniteRDD.scala | 10 +- .../apache/ignite/spark/JavaIgniteContext.scala | 55 +++++ .../org/apache/ignite/spark/JavaIgniteRDD.scala | 94 +++++++ .../spark/impl/JavaIgniteAbstractRDD.scala | 34 +++ .../ignite/spark/JavaIgniteRDDSelfTest.java | 244 +++++++++++++++++++ parent/pom.xml | 4 + 11 files changed, 713 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index a4a25f5..a36ee87 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -87,6 +87,20 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java new file mode 100644 index 0000000..dd687cf --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples.java; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spark.*; +import org.apache.ignite.spark.examples.*; +import org.apache.spark.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.*; + +import scala.Tuple2; + +import java.util.*; + +/** + * Colocation test example. + */ +public class ColocationTest { + /** + * @param args Args. + */ + public static void main(String[] args) { + SparkConf conf = new SparkConf(); + + conf.setAppName("Colocation test"); + + JavaSparkContext sc = new JavaSparkContext("local[*]", "Colocation test", conf); + + JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() { + @Override public IgniteConfiguration apply() { + return ExampleConfiguration.configuration(); + } + }); + + JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned"); + + List<Integer> seq = new ArrayList<>(); + + long sum = 0; + + for (int i = 0; i < 100000; i++) { + seq.add(i); + + sum += i; + } + + IgniteClosure<Integer, Tuple2<Integer, Integer>> f = new IgniteClosure<Integer, Tuple2<Integer, Integer>>() { + @Override public Tuple2<Integer, Integer> apply(Integer i) { + return new Tuple2<>(i, i); + } + }; + + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(F.transformList(seq, f), 48); + + cache.savePairs(rdd); + + // Execute parallel sum. + System.out.println("Local sum: " + sum); + + System.out.println("Distributed sum: " + cache.map(new Function<Tuple2<Integer,Integer>, Integer>() { + @Override public Integer call(Tuple2<Integer, Integer> t) throws Exception { + return t._2(); + } + }).fold(0, new Function2<Integer, Integer, Integer>() { + public Integer call(Integer x, Integer y) { + return x + y; + } + })); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java new file mode 100644 index 0000000..e69ca5f --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples.java; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spark.*; +import org.apache.ignite.spark.examples.*; +import org.apache.spark.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.*; + +import scala.*; + +import java.lang.Boolean; +import java.util.*; + +/** + * Ignite process example. + */ +public class IgniteProcessExample { + /** + * @param args Args. + */ + public static void main(String[] args) { + SparkConf conf = new SparkConf(); + + conf.setAppName("Ignite processing example"); + + JavaSparkContext sc = new JavaSparkContext("local[*]", "Ignite processing example", conf); + + JavaIgniteContext<Object, String> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() { + @Override public IgniteConfiguration apply() { + return ExampleConfiguration.configuration(); + } + }); + + // Search for lines containing "Ignite". + JavaIgniteRDD<Object, String> scanRdd = ignite.fromCache("partitioned"); + + JavaRDD<String> processedRdd = scanRdd.filter(new Function<Tuple2<Object, String>, Boolean>() { + @Override public Boolean call(Tuple2<Object, String> t) throws Exception { + System.out.println("Analyzing line: " + t._2()); + + t._2().contains("Ignite"); + + return true; + } + }).map(new Function<Tuple2<Object, String>, String>() { + @Override public String call(Tuple2<Object, String> t) throws Exception { + return t._2(); + } + }); + + // Create a new cache for results. + JavaIgniteRDD<Object, String> results = ignite.fromCache("results"); + + results.saveValues(processedRdd); + + // SQL query + ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect(); + + // SQL fields query + DataFrame df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java new file mode 100644 index 0000000..c1299ef --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples.java; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spark.*; +import org.apache.ignite.spark.examples.*; +import org.apache.spark.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.*; +import org.apache.spark.api.java.function.Function; + +import scala.*; + +import java.lang.*; +import java.lang.Boolean; + +/** + * Ignite store example. + */ +public class IgniteStoreExample { + /** + * @param args Args. + */ + public static void main(String[] args) { + SparkConf conf = new SparkConf(); + + conf.setAppName("Ignite processing example"); + + JavaSparkContext sc = new JavaSparkContext("local[*]", "Ignite processing example", conf); + + JavaIgniteContext<String, String> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() { + @Override public IgniteConfiguration apply() { + return ExampleConfiguration.configuration(); + } + }); + + + JavaRDD<String> lines = sc.textFile(args[0]).filter(new Function<String, Boolean>() { + @Override public Boolean call(String s) throws Exception { + System.out.println("Read line: " + s); + + return s.contains("Ignite"); + } + }); + + ignite.fromCache("partitioned").saveValues(lines); + + ignite.fromCache("partitioned").savePairs(lines.mapToPair(new PairFunction<String, String, String>() { + @Override public Tuple2<String, String> call(String s) throws Exception { + return new Tuple2<>(s, s); + } + })); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java new file mode 100644 index 0000000..e3243bf --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Demonstrates usage of Ignite and Spark from Java. + */ +package org.apache.ignite.spark.examples.java; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 0b8e845..742d7ee 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -43,9 +43,9 @@ import scala.collection.JavaConversions._ * @tparam V Value type. */ class IgniteRDD[K, V] ( - ic: IgniteContext[K, V], - cacheName: String, - cacheCfg: CacheConfiguration[K, V] + val ic: IgniteContext[K, V], + val cacheName: String, + val cacheCfg: CacheConfiguration[K, V] ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) { /** * Computes iterator based on given partition. @@ -73,7 +73,7 @@ class IgniteRDD[K, V] ( * * @return Partitions. */ - override protected def getPartitions: Array[Partition] = { + override protected[spark] def getPartitions: Array[Partition] = { ensureCache() val parts = ic.ignite().affinity(cacheName).partitions() @@ -87,7 +87,7 @@ class IgniteRDD[K, V] ( * @param split Split partition. * @return */ - override protected def getPreferredLocations(split: Partition): Seq[String] = { + override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = { ensureCache() ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala new file mode 100644 index 0000000..6c52e65 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.lang.IgniteOutClosure +import org.apache.spark.api.java.JavaSparkContext + +import scala.reflect.ClassTag + +class JavaIgniteContext[K, V]( + @scala.transient val sc: JavaSparkContext, + val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable { + + @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply()) + + def this(sc: JavaSparkContext, springUrl: String) { + this(sc, new IgniteOutClosure[IgniteConfiguration] { + override def apply() = IgnitionEx.loadConfiguration(springUrl).get1() + }) + } + + def fromCache(cacheName: String): JavaIgniteRDD[K, V] = + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null)) + + def fromCache(cacheCfg: CacheConfiguration[K, V]) = + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg)) + + def ignite(): Ignite = ic.ignite() + + def close() = ic.close() + + private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + implicit val ktag: ClassTag[K] = fakeClassTag + + implicit val vtag: ClassTag[V] = fakeClassTag +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala new file mode 100644 index 0000000..f2c9c9a --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import java.util + +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.{Partition, TaskContext} + +import scala.annotation.varargs +import scala.collection.JavaConversions._ +import scala.language.implicitConversions +import scala.reflect.ClassTag + + +class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V]) + extends JavaPairRDD[K, V](rdd)(JavaIgniteRDD.fakeClassTag, JavaIgniteRDD.fakeClassTag) { + //with JavaRDDLike[(K, V), JavaPairRDD[K, V]] { + + override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) + + override val classTag: ClassTag[(K, V)] = JavaIgniteRDD.fakeClassTag + + /** + * Computes iterator based on given partition. + * + * @param part Partition to use. + * @param context Task context. + * @return Partition iterator. + */ + def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + rdd.compute(part, context) + } + + /** + * Gets partitions for the given cache RDD. + * + * @return Partitions. + */ + protected def getPartitions: java.util.List[Partition] = { + new util.ArrayList[Partition](rdd.getPartitions.toSeq) + } + + /** + * Gets preferred locations for the given partition. + * + * @param split Split partition. + * @return + */ + protected def getPreferredLocations(split: Partition): Seq[String] = { + rdd.getPreferredLocations(split) + } + + @varargs def objectSql(typeName: String, sql: String, args: Any*): JavaPairRDD[K, V] = + JavaPairRDD.fromRDD(rdd.objectSql(typeName, sql, args)) + + @varargs def sql(sql: String, args: Any*): DataFrame = rdd.sql(sql, args) + + def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd)) + + def savePairs(jrdd: JavaPairRDD[K, V]) = { + val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd) + + rdd.savePairs(rrdd) + } + + def clear(): Unit = rdd.clear() +} + +object JavaIgniteRDD { + implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] = + new JavaIgniteRDD[K, V](rdd) + + implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd + + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala new file mode 100644 index 0000000..13bd3e8 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.IgniteCache +import org.apache.ignite.spark.IgniteRDD +import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike} + +abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V]) + extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { + + protected def ensureCache(): IgniteCache[K, V] = { + // Make sure to deploy the cache + if (rdd.cacheCfg != null) + rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg) + else + rdd.ic.ignite().getOrCreateCache(rdd.cacheName) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java new file mode 100644 index 0000000..d0ad4d4 --- /dev/null +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; + +import scala.*; + +import java.util.*; + +/** + * Tests for {@link JavaIgniteRDD}. + */ +public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Keys count. */ + private static final int KEYS_CNT = 10000; + + /** Cache name. */ + private static final String PARTITIONED_CACHE_NAME = "partitioned"; + + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Sum function. */ + private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() { + public Integer call(Integer x, Integer y) { + return x + y; + } + }; + + /** To pair function. */ + private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() { + /** {@inheritDoc} */ + @Override public Tuple2<String, String> call(Integer i) { + return new Tuple2<>(String.valueOf(i), "val" + i); + } + }; + + /** (String, Integer); pair to Integer value function. */ + private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>(); + + /** (String, Entity) pair to Entity value function. */ + private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F = + new PairToValueFunction<>(); + + /** Integer to entity function. */ + private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F = + new PairFunction<Integer, String, Entity>() { + @Override public Tuple2<String, Entity> call(Integer i) throws Exception { + return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100)); + } + }; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + Ignition.stop("client", false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < GRID_CNT; i++) + Ignition.start(getConfiguration("grid-" + i, false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + for (int i = 0; i < GRID_CNT; i++) + Ignition.stop("grid-" + i, false); + } + + /** + * @throws Exception If failed. + */ + public void testStoreDataToIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + ic.fromCache(PARTITIONED_CACHE_NAME) + .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F)); + + Ignite ignite = Ignition.ignite("grid-0"); + + IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) { + String val = cache.get(String.valueOf(i)); + + assertNotNull("Value was not put to cache for key: " + i, val); + assertEquals("Invalid value stored for key: " + i, "val" + i, val); + } + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReadDataFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + Ignite ignite = Ignition.ignite("grid-0"); + + IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) + cache.put(String.valueOf(i), i); + + JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F); + + int sum = values.fold(0, SUM_F); + + int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT; + + assertEquals(expSum, sum); + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryObjectsFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, 1000), 2).mapToPair(INT_TO_ENTITY_F)); + + List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) + .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); + + assertEquals("Invalid result length", 1, res.size()); + } + finally { + sc.stop(); + } + } + + /** + * @param gridName Grid name. + * @param client Client. + */ + private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration()); + + cfg.setClientMode(client); + + cfg.setGridName(gridName); + + return cfg; + } + + /** + * Creates cache configuration. + */ + private static CacheConfiguration<Object, Object> cacheConfiguration() { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setBackups(1); + + ccfg.setName(PARTITIONED_CACHE_NAME); + + ccfg.setIndexedTypes(String.class, Entity.class); + + return ccfg; + } + + /** + * Ignite configiration provider. + */ + static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> { + /** {@inheritDoc} */ + @Override public IgniteConfiguration apply() { + try { + return getConfiguration("client", true); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> { + /** {@inheritDoc} */ + @Override public V call(Tuple2<K, V> t) throws Exception { + return t._2(); + } + }; + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c6d28a/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index a514e35..f5b73df 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -321,6 +321,10 @@ <title>Mesos Framework</title> <packages>org.apache.ignite.mesos*</packages> </group> + <group> + <title>Spark Integration</title> + <packages>org.apache.ignite.spark.examples.java</packages> + </group> </groups> <header> <![CDATA[