IGNITE-389 - WIP

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/389ec79d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/389ec79d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/389ec79d

Branch: refs/heads/ignite-389
Commit: 389ec79dff949fda2c7c16c84141b1b5f86793b5
Parents: edd1a95
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Tue May 19 19:51:43 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Tue May 19 19:51:43 2015 -0700

----------------------------------------------------------------------
 modules/spark/pom.xml                           | 47 ++++++-----
 .../org/apache/ignite/spark/IgniteContext.scala | 85 +++++++++++++++-----
 .../org/apache/ignite/spark/IgniteRDD.scala     | 13 +--
 .../spark/examples/ExampleConfiguration.scala   | 41 ++++++++++
 .../spark/examples/IgniteProcessExample.scala   | 44 ++++++++++
 .../spark/examples/IgniteStoreExample.scala     | 40 +++++++++
 .../spark/util/SerializablePredicate2.scala     | 32 ++++++++
 .../org/apache/ignite/spark/util/using.scala    | 32 --------
 8 files changed, 248 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 9d4ea86..aeae234 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -48,37 +48,40 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-scalar</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <version>2.11.2</version>
+            <version>2.10.4</version>
         </dependency>
         <dependency>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_2.11</artifactId>
-            <version>2.2.4</version>
-            <scope>test</scope>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <version>1.3.1</version>
             <exclusions>
                 <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
+                    <groupId>com.twitter</groupId>
+                    <artifactId>chill_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.twitter</groupId>
+                    <artifactId>chill-java</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
-            <version>1.3.1</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
-            <version>1.3.1</version>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.apache.spark</groupId>-->
+            <!--<artifactId>spark-sql_2.10</artifactId>-->
+            <!--<version>1.3.1</version>-->
+            <!--<exclusions>-->
+                <!--<exclusion>-->
+                    <!--<groupId>com.twitter</groupId>-->
+                    <!--<artifactId>chill_2.11</artifactId>-->
+                <!--</exclusion>-->
+                <!--<exclusion>-->
+                    <!--<groupId>com.twitter</groupId>-->
+                    <!--<artifactId>chill-java</artifactId>-->
+                <!--</exclusion>-->
+            <!--</exclusions>-->
+        <!--</dependency>-->
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 0913605..56d2a05 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -17,42 +17,51 @@
 
 package org.apache.ignite.spark
 
+import javax.cache.Cache
+
+import org.apache.ignite.cache.query.{Query, ScanQuery}
 import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.internal.IgnitionEx
-import org.apache.ignite.spark.util.using
+import org.apache.ignite.lang.IgniteUuid
+import org.apache.ignite.spark.util.SerializablePredicate2
 import org.apache.ignite.{Ignition, IgniteCache, Ignite}
 import org.apache.ignite.configuration.{CacheConfiguration, 
IgniteConfiguration}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
 class IgniteContext[K, V](
-    sc: SparkContext,
-    igniteCfg: IgniteConfiguration,
+    @scala.transient sc: SparkContext,
+    cfgF: () => IgniteConfiguration,
     val cacheName: String,
     cacheCfg: CacheConfiguration[K, V]
-) {
+) extends Serializable {
+    type ScanRDD[K1, V1] = IgniteRDD[Cache.Entry[K1, V1], K1, V1]
+
     def this(
         sc: SparkContext,
         springUrl: String,
         cacheName: String
     ) {
-        this(sc, IgnitionEx.loadConfiguration(springUrl).get1(), cacheName, 
null)
+        this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), 
cacheName, null)
     }
 
     def this(
         sc: SparkContext,
-        igniteCfg: IgniteConfiguration,
+        cfgF: () => IgniteConfiguration,
         cacheName: String
     ) {
-        this(sc, igniteCfg, cacheName, null)
+        this(sc, cfgF, cacheName, null)
     }
 
     def this(
         sc: SparkContext,
-        igniteCfg: IgniteConfiguration,
+        cfgF: () => IgniteConfiguration,
         cacheCfg: CacheConfiguration[K, V]
     ) {
-        this(sc, igniteCfg, cacheCfg.getName, cacheCfg)
+        this(sc, cfgF, cacheCfg.getName, cacheCfg)
     }
 
     def this(
@@ -60,43 +69,75 @@ class IgniteContext[K, V](
         springUrl: String,
         cacheCfg: CacheConfiguration[K, V]
     ) {
-        this(sc, IgnitionEx.loadConfiguration(springUrl).get1(), 
cacheCfg.getName, cacheCfg)
+        this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), 
cacheCfg.getName, cacheCfg)
     }
 
     def sparkContext() = sc
 
-    def saveToIgnite(rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, 
ClusterNode) => K = affinityKeyFunc) = {
+    def scan(p: (K, V) => Boolean = (_, _) => true): ScanRDD[K, V] = {
+        new ScanRDD(this, new ScanQuery[K, V](new SerializablePredicate2[K, 
V](p)))
+    }
+
+    def scan[R:ClassTag](qry: Query[R]): IgniteRDD[R, K, V] = {
+        new IgniteRDD[R, K, V](this, qry)
+    }
+
+    def saveToIgnite[T](rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, 
ClusterNode) => T = affinityKeyFunc(_: IgniteContext[K, V], _:V, _: 
ClusterNode)) = {
         rdd.foreachPartition(it => {
-            // TODO get affinity node
+            println("Using scala version: " + 
scala.util.Properties.versionString)
+            // Make sure to deploy the cache
+            igniteCache()
+
+            val ig = ignite()
 
-            using(ignite().dataStreamer[K, V](cacheName)) { streamer =>
+            val locNode = ig.cluster().localNode()
+
+            val node: Option[ClusterNode] = 
ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
+
+            val streamer = ignite().dataStreamer[T, V](cacheName)
+
+            try {
                 it.foreach(value => {
-                    val key: K = keyFunc(this, value, null)
+                    val key: T = keyFunc(this, value, node.orNull)
+
+                    println("Saving: " + key + ", " + value)
+
                     streamer.addData(key, value)
                 })
             }
+            finally {
+                streamer.close()
+            }
         })
     }
 
     def ignite(): Ignite = {
+        val igniteCfg = cfgF()
+
         try {
             Ignition.ignite(igniteCfg.getGridName)
         }
         catch {
             case e: Exception =>
-                igniteCfg.setClientMode(true)
+                try {
+                    igniteCfg.setClientMode(true)
 
-                Ignition.start(igniteCfg)
+                    Ignition.start(igniteCfg)
+                }
+                catch {
+                    case e: Exception => Ignition.ignite(igniteCfg.getGridName)
+                }
         }
     }
 
-    def igniteCache(): IgniteCache[K, V] = {
-//        new IgniteRDD[Object, K, V](this, (k: K, v: V) => {true})
-
-        ignite().cache(cacheName)
+    private def igniteCache(): IgniteCache[K, V] = {
+        if (cacheCfg == null)
+            ignite().getOrCreateCache(cacheName)
+        else
+            ignite().getOrCreateCache(cacheCfg)
     }
 
-    private def affinityKeyFunc(ic: IgniteContext[K, V], key: K, node: 
ClusterNode) = {
-        null
+    private def affinityKeyFunc(ic: IgniteContext[K, V], value: V, node: 
ClusterNode): Object = {
+        IgniteUuid.randomUuid()
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 186d1ae..4018c53 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -17,25 +17,18 @@
 
 package org.apache.ignite.spark
 
-import org.apache.ignite.cache.query.{ScanQuery, Query}
-import org.apache.ignite.scalar.lang.ScalarPredicate2
+import org.apache.ignite.cache.query.Query
 import org.apache.ignite.spark.impl.{IgniteQueryIterator, IgnitePartition}
 import org.apache.spark.{TaskContext, Partition}
 import org.apache.spark.rdd.RDD
 
 import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
 
-class IgniteRDD[R, K, V](
+class IgniteRDD[R:ClassTag, K, V](
     ic: IgniteContext[K, V],
     qry: Query[R]
 ) extends RDD[R] (ic.sparkContext(), deps = Nil) {
-    def this(
-        ic: IgniteContext[K, V],
-        p: (K, V) => Boolean
-    ) = {
-        this(ic, new ScanQuery[K, V](new ScalarPredicate2[K, V](p)))
-    }
-
     override def compute(part: Partition, context: TaskContext): Iterator[R] = 
{
         new IgniteQueryIterator[R, K, V](ic, part, qry)
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala
new file mode 100644
index 0000000..3b0dac7
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples
+
+import org.apache.ignite.configuration.IgniteConfiguration
+import org.apache.ignite.internal.util.lang.{GridFunc => F}
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
+
+object ExampleConfiguration {
+    def configuration(): IgniteConfiguration = {
+        val cfg = new IgniteConfiguration()
+
+        val discoSpi = new TcpDiscoverySpi()
+
+        val ipFinder = new TcpDiscoveryVmIpFinder()
+
+        ipFinder.setAddresses(F.asList("127.0.0.1:47500", "127.0.0.1:47501"))
+
+        discoSpi.setIpFinder(ipFinder)
+
+        cfg.setDiscoverySpi(discoSpi)
+
+        cfg
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
new file mode 100644
index 0000000..4aeecb0
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples
+
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.{SparkContext, SparkConf}
+
+object IgniteProcessExample {
+    def main(args: Array[String]) {
+        val conf = new SparkConf().setAppName("Ignite processing example")
+        val sc = new SparkContext(conf)
+
+        val partitioned = new IgniteContext[Object, String](sc, 
ExampleConfiguration.configuration _, "partitioned")
+
+        // Search for lines containing "Ignite".
+        val scan = partitioned.scan((k, v) => v.contains("Ignite"))
+
+        val processed = scan.filter(line => {
+            println("Analyzing line: " + line)
+
+            true
+        }).map(_.getValue)
+
+        // Create a new cache for results.
+        val results = new IgniteContext[Object, String](sc, 
ExampleConfiguration.configuration _, "results")
+
+        results.saveToIgnite(processed)
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
new file mode 100644
index 0000000..a7823f4
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples
+
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+
+object IgniteStoreExample {
+    def main(args: Array[String]) {
+        val conf = new SparkConf().setAppName("Ignite store example")
+        val sc = new SparkContext(conf)
+
+        val ignite = new IgniteContext[String, String](sc, 
ExampleConfiguration.configuration _, "partitioned")
+
+        val lines: RDD[String] = sc.textFile(args(0)).filter(line => {
+            println("Read line: " + line)
+
+            true
+        })
+
+        ignite.saveToIgnite(lines)
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
new file mode 100644
index 0000000..484d0df
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.util
+
+import org.apache.ignite.lang.IgniteBiPredicate
+
+/**
+ * Peer deploy aware adapter for Java's `GridPredicate2`.
+ */
+class SerializablePredicate2[T1, T2](private val p: (T1, T2) => Boolean) 
extends IgniteBiPredicate[T1, T2] {
+    assert(p != null)
+
+    /**
+     * Delegates to passed in function.
+     */
+    def apply(e1: T1, e2: T2) = p(e1, e2)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala
deleted file mode 100644
index 3b46d16..0000000
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark.util
-
-import scala.util.Try
-
-object using {
-    type AutoClosable = { def close(): Unit }
-
-    def apply[A <: AutoClosable, B](resource: A)(code: A => B): B =
-        try {
-            code(resource)
-        }
-        finally {
-            Try(resource.close())
-        }
-}
\ No newline at end of file

Reply via email to