Repository: incubator-ignite Updated Branches: refs/heads/ignite-948 [created] 4944bb482
ignite-948 Add Java API for Ignite RDD Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4944bb48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4944bb48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4944bb48 Branch: refs/heads/ignite-948 Commit: 4944bb4827d5f0c2517b47441bc0f1259f9378bf Parents: c527a04 Author: agura <ag...@gridgain.com> Authored: Tue Jun 2 01:09:17 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Tue Jun 2 01:09:17 2015 +0300 ---------------------------------------------------------------------- .../spark/examples/java/ColocationTest.java | 74 ++++++++++++++ .../org/apache/ignite/spark/IgniteRDD.scala | 10 +- .../apache/ignite/spark/JavaIgniteContext.scala | 55 ++++++++++ .../org/apache/ignite/spark/JavaIgniteRDD.scala | 101 +++++++++++++++++++ 4 files changed, 235 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java new file mode 100644 index 0000000..932f922 --- /dev/null +++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples.java; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spark.*; +import org.apache.ignite.spark.examples.*; +import org.apache.spark.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import scala.*; + +import java.util.*; + +public class ColocationTest { + public static void main(String[] args) { + SparkConf conf = new SparkConf(); + + conf.setAppName("Colocation test"); + + JavaSparkContext sc = new JavaSparkContext("local[*]", "Colocation test", conf); + + JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() { + @Override public IgniteConfiguration apply() { + return ExampleConfiguration.configuration(); + } + }); + + JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned"); + + List<Integer> seq = new ArrayList<>(); + + long sum = 0; + + for (int i = 0; i < 100000; i++) { + seq.add(i); + + sum += i; + } + + cache.savePairs(sc.parallelize(seq, 48).map(new Function<Integer, Tuple2<Integer, Integer>>() { + @Override public Tuple2<Integer, Integer> call(Integer v1) throws Exception { + return new Tuple2<>(v1, v1); + } + })); + + // Execute parallel sum. + System.out.println("Local sum: " + sum); + + Function1<Tuple2<Integer, Integer>, Integer> f = new Function1<Tuple2<Integer, Integer>, Integer>() { + @Override public Integer apply(Tuple2<Integer, Integer> t) { + return t._2(); + } + }; + + //System.out.println("Distributed sum: " + cache.map(f).sum()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index f286b58..05df188 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -39,9 +39,9 @@ import scala.collection.JavaConversions._ * @tparam V Value type. */ class IgniteRDD[K, V] ( - ic: IgniteContext[K, V], - cacheName: String, - cacheCfg: CacheConfiguration[K, V] + val ic: IgniteContext[K, V], + val cacheName: String, + val cacheCfg: CacheConfiguration[K, V] ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) { /** * Computes iterator based on given partition. @@ -69,7 +69,7 @@ class IgniteRDD[K, V] ( * * @return Partitions. */ - override protected def getPartitions: Array[Partition] = { + override protected[spark] def getPartitions: Array[Partition] = { ensureCache() val parts = ic.ignite().affinity(cacheName).partitions() @@ -83,7 +83,7 @@ class IgniteRDD[K, V] ( * @param split Split partition. * @return */ - override protected def getPreferredLocations(split: Partition): Seq[String] = { + override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = { ensureCache() ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala new file mode 100644 index 0000000..992be52 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.lang.IgniteOutClosure +import org.apache.spark.api.java.JavaSparkContext + +import scala.reflect.ClassTag + +class JavaIgniteContext[K, V]( + @scala.transient val sc: JavaSparkContext, + val cfg: IgniteOutClosure[IgniteConfiguration]) extends Serializable { + + val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfg.apply()) + + def this(sc: JavaSparkContext, springUrl: String) { + this(sc, new IgniteOutClosure[IgniteConfiguration] { + override def apply() = IgnitionEx.loadConfiguration(springUrl).get1() + }) + } + + def fromCache(cacheName: String): JavaIgniteRDD[K, V] = + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null)) + + def fromCache(cacheCfg: CacheConfiguration[K, V]) = + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg)) + + def ignite(): Ignite = ic.ignite() + + def close() = ic.close() + + private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + implicit val ktag: ClassTag[K] = fakeClassTag + + implicit val vtag: ClassTag[V] = fakeClassTag +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala new file mode 100644 index 0000000..6944313 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.lang.IgniteBiTuple +import org.apache.ignite.spark.impl.IgniteAbstractRDD +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.{Partition, TaskContext} + +import scala.collection.JavaConversions._ +import scala.language.implicitConversions +import scala.reflect.ClassTag + + +class JavaIgniteRDD[K, V](val rdd: IgniteRDD[K, V])(implicit val ktag: ClassTag[K], implicit val vtag: ClassTag[V]) + extends IgniteAbstractRDD[(K, V), K, V](rdd.ic, rdd.cacheName, rdd.cacheCfg) { + + /** + * Computes iterator based on given partition. + * + * @param part Partition to use. + * @param context Task context. + * @return Partition iterator. + */ + override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + rdd.compute(part, context) + } + + /** + * Gets partitions for the given cache RDD. + * + * @return Partitions. + */ + override protected def getPartitions: Array[Partition] = { + rdd.getPartitions + } + + /** + * Gets preferred locations for the given partition. + * + * @param split Split partition. + * @return + */ + override protected def getPreferredLocations(split: Partition): Seq[String] = { + rdd.getPreferredLocations(split) + } + + def objectSql(typeName: String, sql: String, args: Any*): JavaRDD[IgniteBiTuple[K, V]] = + JavaRDD.fromRDD(rdd.objectSql(typeName, sql, args)).map(tuple2BiTuple[K, V](_)) + + def sql(sql: String, args: Any*): JavaRDD[Seq[Any]] = JavaRDD.fromRDD(rdd.sql(sql, args)) + + def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd)) + + def savePairs(jrdd: JavaRDD[(K, V)]) = rdd.savePairs(JavaRDD.toRDD(jrdd)) + + def clear(): Unit = rdd.clear() + + implicit def tuple2BiTuple[A, B](tuple: (A, B)): IgniteBiTuple[A, B] = + new IgniteBiTuple[A, B](tuple._1, tuple._2) + + implicit def tupleIt2BiTupleIt[A, B](it: Iterator[(A, B)]): java.util.Iterator[IgniteBiTuple[A, B]] = + new java.util.Iterator[IgniteBiTuple[A, B]] { + val target: java.util.Iterator[(A, B)] = it + + override def next(): IgniteBiTuple[A, B] = target.next() + + override def remove(): Unit = target.remove() + + override def hasNext: Boolean = target.hasNext + } +} + +object JavaIgniteRDD { + implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] = + new JavaIgniteRDD[K, V](rdd) + + implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd +} + +object JavaRDD { + implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd) + + implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd +}