ignite-948 Add Java API for Ignite RDD

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5b8d2fd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5b8d2fd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5b8d2fd2

Branch: refs/heads/ignite-948
Commit: 5b8d2fd2712741a89e5a9d6765ed177857eeba0f
Parents: d0157d4
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 2 01:09:17 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue Jun 2 22:54:09 2015 +0300

----------------------------------------------------------------------
 modules/spark/pom.xml                           |  2 +-
 .../spark/examples/java/ColocationTest.java     | 83 ++++++++++++++++++
 .../examples/java/IgniteProcessExample.java     | 77 +++++++++++++++++
 .../spark/examples/java/IgniteStoreExample.java | 66 ++++++++++++++
 .../org/apache/ignite/spark/IgniteRDD.scala     | 10 +--
 .../apache/ignite/spark/JavaIgniteContext.scala | 55 ++++++++++++
 .../org/apache/ignite/spark/JavaIgniteRDD.scala | 90 ++++++++++++++++++++
 .../spark/impl/JavaIgniteAbstractRDD.scala      | 34 ++++++++
 8 files changed, 411 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b8d2fd2/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 84055d6..a9d8bf5 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -31,7 +31,7 @@
     </parent>
 
     <artifactId>ignite-spark</artifactId>
-    <version>1.0.7-SNAPSHOT</version>
+    <version>1.1.1-SNAPSHOT</version>
 
     <dependencies>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b8d2fd2/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
 
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
new file mode 100644
index 0000000..752a290
--- /dev/null
+++ 
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+
+import scala.Tuple2;
+
+import java.util.*;
+
+public class ColocationTest {
+
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Colocation test");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Colocation 
test", conf);
+
+        JavaIgniteContext<Integer, Integer> ignite = new 
JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+        JavaIgniteRDD<Integer, Integer> cache = 
ignite.fromCache("partitioned");
+
+        List<Integer> seq = new ArrayList<>();
+
+        long sum = 0;
+
+        for (int i = 0; i < 100000; i++) {
+            seq.add(i);
+
+            sum += i;
+        }
+
+        IgniteClosure<Integer, Tuple2<Integer, Integer>> f = new 
IgniteClosure<Integer, Tuple2<Integer, Integer>>() {
+            @Override public Tuple2<Integer, Integer> apply(Integer i) {
+                return new Tuple2<>(i, i);
+            }
+        };
+
+        JavaPairRDD<Integer, Integer> rdd = 
sc.parallelizePairs(F.transformList(seq, f), 48);
+
+        cache.savePairs(rdd);
+
+        // Execute parallel sum.
+        System.out.println("Local sum: " + sum);
+
+        System.out.println("Distributed sum: " + cache.map(new 
Function<Tuple2<Integer,Integer>, Integer>() {
+            @Override public Integer call(Tuple2<Integer, Integer> t) throws 
Exception {
+                return t._2();
+            }
+        }).fold(0, new Function2<Integer, Integer, Integer>() {
+            public Integer call(Integer x, Integer y) {
+                return x + y;
+            }
+        }));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b8d2fd2/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
 
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
new file mode 100644
index 0000000..913f155
--- /dev/null
+++ 
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+
+import scala.*;
+
+import java.lang.Boolean;
+import java.util.*;
+
+public class IgniteProcessExample {
+
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Ignite processing example");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Ignite 
processing example", conf);
+
+        JavaIgniteContext<Object, String> ignite = new JavaIgniteContext<>(sc, 
new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+        // Search for lines containing "Ignite".
+        JavaIgniteRDD<Object, String> scanRdd = 
ignite.fromCache("partitioned");
+
+        JavaRDD<String> processedRdd = scanRdd.filter(new 
Function<Tuple2<Object, String>, Boolean>() {
+            @Override public Boolean call(Tuple2<Object, String> t) throws 
Exception {
+                System.out.println("Analyzing line: " + t._2());
+
+                t._2().contains("Ignite");
+
+                return true;
+            }
+        }).map(new Function<Tuple2<Object, String>, String>() {
+            @Override public String call(Tuple2<Object, String> t) throws 
Exception {
+                return t._2();
+            }
+        });
+
+        // Create a new cache for results.
+        JavaIgniteRDD<Object, String> results = ignite.fromCache("results");
+
+        results.saveValues(processedRdd);
+
+        // SQL query
+        ignite.fromCache("indexed").objectSql("Person", "age > ? and 
organizationId = ?", 20, 12).collect();
+
+        // SQL fields query
+        JavaRDD<List<Object>> sqlRes =
+            ignite.fromCache("indexed").sql("select name, age from Person 
where age > ?", 20);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b8d2fd2/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
 
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
new file mode 100644
index 0000000..52a69ec
--- /dev/null
+++ 
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
+
+import scala.*;
+
+import java.lang.*;
+import java.lang.Boolean;
+
+public class IgniteStoreExample {
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Ignite processing example");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Ignite 
processing example", conf);
+
+        JavaIgniteContext<String, String> ignite = new JavaIgniteContext<>(sc, 
new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+
+        JavaRDD<String> lines = sc.textFile(args[0]).filter(new 
Function<String, Boolean>() {
+            @Override public Boolean call(String s) throws Exception {
+                System.out.println("Read line: " + s);
+
+                return s.contains("Ignite");
+            }
+        });
+
+        ignite.fromCache("partitioned").saveValues(lines);
+
+        ignite.fromCache("partitioned").savePairs(lines.mapToPair(new 
PairFunction<String, String, String>() {
+            @Override public Tuple2<String, String> call(String s) throws 
Exception {
+                return new Tuple2<>(s, s);
+            }
+        }));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b8d2fd2/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index f286b58..05df188 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -39,9 +39,9 @@ import scala.collection.JavaConversions._
  * @tparam V Value type.
  */
 class IgniteRDD[K, V] (
-    ic: IgniteContext[K, V],
-    cacheName: String,
-    cacheCfg: CacheConfiguration[K, V]
+    val ic: IgniteContext[K, V],
+    val cacheName: String,
+    val cacheCfg: CacheConfiguration[K, V]
 ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
     /**
      * Computes iterator based on given partition.
@@ -69,7 +69,7 @@ class IgniteRDD[K, V] (
      *
      * @return Partitions.
      */
-    override protected def getPartitions: Array[Partition] = {
+    override protected[spark] def getPartitions: Array[Partition] = {
         ensureCache()
 
         val parts = ic.ignite().affinity(cacheName).partitions()
@@ -83,7 +83,7 @@ class IgniteRDD[K, V] (
      * @param split Split partition.
      * @return
      */
-    override protected def getPreferredLocations(split: Partition): 
Seq[String] = {
+    override protected[spark] def getPreferredLocations(split: Partition): 
Seq[String] = {
         ensureCache()
 
         
ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b8d2fd2/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
new file mode 100644
index 0000000..992be52
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.configuration.{CacheConfiguration, 
IgniteConfiguration}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.lang.IgniteOutClosure
+import org.apache.spark.api.java.JavaSparkContext
+
+import scala.reflect.ClassTag
+
+class JavaIgniteContext[K, V](
+    @scala.transient val sc: JavaSparkContext,
+    val cfg: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+
+    val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => 
cfg.apply())
+
+    def this(sc: JavaSparkContext, springUrl: String) {
+        this(sc, new IgniteOutClosure[IgniteConfiguration] {
+            override def apply() = 
IgnitionEx.loadConfiguration(springUrl).get1()
+        })
+    }
+
+    def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+
+    def fromCache(cacheCfg: CacheConfiguration[K, V]) =
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, 
cacheCfg))
+
+    def ignite(): Ignite = ic.ignite()
+
+    def close() = ic.close()
+
+    private[spark] def fakeClassTag[T]: ClassTag[T] = 
ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+    implicit val ktag: ClassTag[K] = fakeClassTag
+
+    implicit val vtag: ClassTag[V] = fakeClassTag
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b8d2fd2/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
new file mode 100644
index 0000000..ae8b7de
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import java.util
+
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, TaskContext}
+
+import scala.annotation.varargs
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+
+class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
+    extends JavaPairRDD[K, V](rdd)(JavaIgniteRDD.fakeClassTag, 
JavaIgniteRDD.fakeClassTag) {
+    //with JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+
+    override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = 
JavaPairRDD.fromRDD(rdd)
+
+    override val classTag: ClassTag[(K, V)] = JavaIgniteRDD.fakeClassTag
+
+    /**
+     * Computes iterator based on given partition.
+     *
+     * @param part Partition to use.
+     * @param context Task context.
+     * @return Partition iterator.
+     */
+    def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+        rdd.compute(part, context)
+    }
+
+    /**
+     * Gets partitions for the given cache RDD.
+     *
+     * @return Partitions.
+     */
+    protected def getPartitions: java.util.List[Partition] = {
+        new util.ArrayList[Partition](rdd.getPartitions.toSeq)
+    }
+
+    /**
+     * Gets preferred locations for the given partition.
+     *
+     * @param split Split partition.
+     * @return
+     */
+    protected def getPreferredLocations(split: Partition): Seq[String] = {
+        rdd.getPreferredLocations(split)
+    }
+
+    @varargs def objectSql(typeName: String, sql: String, args: Any*): 
JavaPairRDD[K, V] =
+        JavaPairRDD.fromRDD(rdd.objectSql(typeName, sql, args))
+
+    @varargs def sql(sql: String, args: Any*): JavaRDD[java.util.List[Any]] =
+        JavaRDD.fromRDD(rdd.sql(sql, args).map(_.toSeq))
+
+    def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
+
+    def savePairs(jrdd: JavaPairRDD[K, V]) = 
rdd.savePairs(JavaPairRDD.toRDD(jrdd))
+
+    def clear(): Unit = rdd.clear()
+}
+
+object JavaIgniteRDD {
+    implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, 
V]): JavaIgniteRDD[K, V] =
+        new JavaIgniteRDD[K, V](rdd)
+
+    implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] 
= rdd.rdd
+
+    def fakeClassTag[T]: ClassTag[T] = 
ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b8d2fd2/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
new file mode 100644
index 0000000..13bd3e8
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.spark.IgniteRDD
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
+
+abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
+    extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+
+    protected def ensureCache(): IgniteCache[K, V] = {
+        // Make sure to deploy the cache
+        if (rdd.cacheCfg != null)
+            rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
+        else
+            rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
+    }
+}

Reply via email to