IGNITE-389 - WIP

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4d887323
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4d887323
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4d887323

Branch: refs/heads/ignite-389
Commit: 4d887323ca9fe9c0e3f2cb05f580db5d71727ec3
Parents: aa62584
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Mon May 18 19:57:24 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Mon May 18 19:57:24 2015 -0700

----------------------------------------------------------------------
 modules/spark/pom.xml                           |  5 ++
 .../org/apache/ignite/spark/IgniteContext.scala | 82 +++++++++++++++++++-
 .../apache/ignite/spark/IgnitePartition.scala   | 30 -------
 .../org/apache/ignite/spark/IgniteRDD.scala     | 34 +++++++-
 .../ignite/spark/impl/IgnitePartition.scala     | 24 ++++++
 .../ignite/spark/impl/IgniteQueryIterator.scala | 32 ++++++++
 .../org/apache/ignite/spark/util/using.scala    | 32 ++++++++
 7 files changed, 204 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 0a60c2f..9d4ea86 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -48,6 +48,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-scalar</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>2.11.2</version>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index c30c847..0913605 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -17,6 +17,86 @@
 
 package org.apache.ignite.spark
 
-class IgniteContext {
+import org.apache.ignite.cluster.ClusterNode
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.util.using
+import org.apache.ignite.{Ignition, IgniteCache, Ignite}
+import org.apache.ignite.configuration.{CacheConfiguration, 
IgniteConfiguration}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
 
+class IgniteContext[K, V](
+    sc: SparkContext,
+    igniteCfg: IgniteConfiguration,
+    val cacheName: String,
+    cacheCfg: CacheConfiguration[K, V]
+) {
+    def this(
+        sc: SparkContext,
+        springUrl: String,
+        cacheName: String
+    ) {
+        this(sc, IgnitionEx.loadConfiguration(springUrl).get1(), cacheName, 
null)
+    }
+
+    def this(
+        sc: SparkContext,
+        igniteCfg: IgniteConfiguration,
+        cacheName: String
+    ) {
+        this(sc, igniteCfg, cacheName, null)
+    }
+
+    def this(
+        sc: SparkContext,
+        igniteCfg: IgniteConfiguration,
+        cacheCfg: CacheConfiguration[K, V]
+    ) {
+        this(sc, igniteCfg, cacheCfg.getName, cacheCfg)
+    }
+
+    def this(
+        sc: SparkContext,
+        springUrl: String,
+        cacheCfg: CacheConfiguration[K, V]
+    ) {
+        this(sc, IgnitionEx.loadConfiguration(springUrl).get1(), 
cacheCfg.getName, cacheCfg)
+    }
+
+    def sparkContext() = sc
+
+    def saveToIgnite(rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, 
ClusterNode) => K = affinityKeyFunc) = {
+        rdd.foreachPartition(it => {
+            // TODO get affinity node
+
+            using(ignite().dataStreamer[K, V](cacheName)) { streamer =>
+                it.foreach(value => {
+                    val key: K = keyFunc(this, value, null)
+                    streamer.addData(key, value)
+                })
+            }
+        })
+    }
+
+    def ignite(): Ignite = {
+        try {
+            Ignition.ignite(igniteCfg.getGridName)
+        }
+        catch {
+            case e: Exception =>
+                igniteCfg.setClientMode(true)
+
+                Ignition.start(igniteCfg)
+        }
+    }
+
+    def igniteCache(): IgniteCache[K, V] = {
+//        new IgniteRDD[Object, K, V](this, (k: K, v: V) => {true})
+
+        ignite().cache(cacheName)
+    }
+
+    private def affinityKeyFunc(ic: IgniteContext[K, V], key: K, node: 
ClusterNode) = {
+        null
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/IgnitePartition.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgnitePartition.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgnitePartition.scala
deleted file mode 100644
index 65a6fd4..0000000
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgnitePartition.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark
-
-import org.apache.ignite.cluster.ClusterNode
-import org.apache.spark.Partition
-
-class IgnitePartition(
-    ic: IgniteContext,
-    cacheName: String,
-    idx: Int) extends Partition {
-    override def index: Int = idx
-
-    def nodes(): Seq[ClusterNode] = ???
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index f3908c8..186d1ae 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -17,10 +17,36 @@
 
 package org.apache.ignite.spark
 
-import org.apache.spark.{TaskContext, Partition, SparkContext}
+import org.apache.ignite.cache.query.{ScanQuery, Query}
+import org.apache.ignite.scalar.lang.ScalarPredicate2
+import org.apache.ignite.spark.impl.{IgniteQueryIterator, IgnitePartition}
+import org.apache.spark.{TaskContext, Partition}
 import org.apache.spark.rdd.RDD
 
-class IgniteRDD[T](
-    sc: SparkContext
-) {
+import scala.collection.JavaConversions._
+
+class IgniteRDD[R, K, V](
+    ic: IgniteContext[K, V],
+    qry: Query[R]
+) extends RDD[R] (ic.sparkContext(), deps = Nil) {
+    def this(
+        ic: IgniteContext[K, V],
+        p: (K, V) => Boolean
+    ) = {
+        this(ic, new ScanQuery[K, V](new ScalarPredicate2[K, V](p)))
+    }
+
+    override def compute(part: Partition, context: TaskContext): Iterator[R] = 
{
+        new IgniteQueryIterator[R, K, V](ic, part, qry)
+    }
+
+    override protected def getPartitions: Array[Partition] = {
+        val parts = ic.ignite().affinity(ic.cacheName).partitions()
+
+        (0 until parts).map(new IgnitePartition(_)).toArray
+    }
+
+    override protected def getPreferredLocations(split: Partition): 
Seq[String] = {
+        
ic.ignite().affinity(ic.cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
new file mode 100644
index 0000000..2def636
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.spark.Partition
+
+class IgnitePartition(idx: Int) extends Partition {
+    override def index: Int = idx
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
new file mode 100644
index 0000000..07b24a9
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cache.query.Query
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.Partition
+
+class IgniteQueryIterator[R, K, V] (
+    ic: IgniteContext[K, V],
+    part: Partition,
+    qry: Query[R]
+    ) extends Iterator[R] {
+    override def hasNext: Boolean = ???
+
+    override def next(): R = ???
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala
new file mode 100644
index 0000000..3b46d16
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.util
+
+import scala.util.Try
+
+object using {
+    type AutoClosable = { def close(): Unit }
+
+    def apply[A <: AutoClosable, B](resource: A)(code: A => B): B =
+        try {
+            code(resource)
+        }
+        finally {
+            Try(resource.close())
+        }
+}
\ No newline at end of file

Reply via email to