IGNITE-389 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4d887323 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4d887323 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4d887323 Branch: refs/heads/ignite-389 Commit: 4d887323ca9fe9c0e3f2cb05f580db5d71727ec3 Parents: aa62584 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon May 18 19:57:24 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon May 18 19:57:24 2015 -0700 ---------------------------------------------------------------------- modules/spark/pom.xml | 5 ++ .../org/apache/ignite/spark/IgniteContext.scala | 82 +++++++++++++++++++- .../apache/ignite/spark/IgnitePartition.scala | 30 ------- .../org/apache/ignite/spark/IgniteRDD.scala | 34 +++++++- .../ignite/spark/impl/IgnitePartition.scala | 24 ++++++ .../ignite/spark/impl/IgniteQueryIterator.scala | 32 ++++++++ .../org/apache/ignite/spark/util/using.scala | 32 ++++++++ 7 files changed, 204 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index 0a60c2f..9d4ea86 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -48,6 +48,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-scalar</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.2</version> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index c30c847..0913605 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -17,6 +17,86 @@ package org.apache.ignite.spark -class IgniteContext { +import org.apache.ignite.cluster.ClusterNode +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.util.using +import org.apache.ignite.{Ignition, IgniteCache, Ignite} +import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +class IgniteContext[K, V]( + sc: SparkContext, + igniteCfg: IgniteConfiguration, + val cacheName: String, + cacheCfg: CacheConfiguration[K, V] +) { + def this( + sc: SparkContext, + springUrl: String, + cacheName: String + ) { + this(sc, IgnitionEx.loadConfiguration(springUrl).get1(), cacheName, null) + } + + def this( + sc: SparkContext, + igniteCfg: IgniteConfiguration, + cacheName: String + ) { + this(sc, igniteCfg, cacheName, null) + } + + def this( + sc: SparkContext, + igniteCfg: IgniteConfiguration, + cacheCfg: CacheConfiguration[K, V] + ) { + this(sc, igniteCfg, cacheCfg.getName, cacheCfg) + } + + def this( + sc: SparkContext, + springUrl: String, + cacheCfg: CacheConfiguration[K, V] + ) { + this(sc, IgnitionEx.loadConfiguration(springUrl).get1(), cacheCfg.getName, cacheCfg) + } + + def sparkContext() = sc + + def saveToIgnite(rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, ClusterNode) => K = affinityKeyFunc) = { + rdd.foreachPartition(it => { + // TODO get affinity node + + using(ignite().dataStreamer[K, V](cacheName)) { streamer => + it.foreach(value => { + val key: K = keyFunc(this, value, null) + streamer.addData(key, value) + }) + } + }) + } + + def ignite(): Ignite = { + try { + Ignition.ignite(igniteCfg.getGridName) + } + catch { + case e: Exception => + igniteCfg.setClientMode(true) + + Ignition.start(igniteCfg) + } + } + + def igniteCache(): IgniteCache[K, V] = { +// new IgniteRDD[Object, K, V](this, (k: K, v: V) => {true}) + + ignite().cache(cacheName) + } + + private def affinityKeyFunc(ic: IgniteContext[K, V], key: K, node: ClusterNode) = { + null + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/IgnitePartition.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgnitePartition.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgnitePartition.scala deleted file mode 100644 index 65a6fd4..0000000 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgnitePartition.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark - -import org.apache.ignite.cluster.ClusterNode -import org.apache.spark.Partition - -class IgnitePartition( - ic: IgniteContext, - cacheName: String, - idx: Int) extends Partition { - override def index: Int = idx - - def nodes(): Seq[ClusterNode] = ??? -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index f3908c8..186d1ae 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -17,10 +17,36 @@ package org.apache.ignite.spark -import org.apache.spark.{TaskContext, Partition, SparkContext} +import org.apache.ignite.cache.query.{ScanQuery, Query} +import org.apache.ignite.scalar.lang.ScalarPredicate2 +import org.apache.ignite.spark.impl.{IgniteQueryIterator, IgnitePartition} +import org.apache.spark.{TaskContext, Partition} import org.apache.spark.rdd.RDD -class IgniteRDD[T]( - sc: SparkContext -) { +import scala.collection.JavaConversions._ + +class IgniteRDD[R, K, V]( + ic: IgniteContext[K, V], + qry: Query[R] +) extends RDD[R] (ic.sparkContext(), deps = Nil) { + def this( + ic: IgniteContext[K, V], + p: (K, V) => Boolean + ) = { + this(ic, new ScanQuery[K, V](new ScalarPredicate2[K, V](p))) + } + + override def compute(part: Partition, context: TaskContext): Iterator[R] = { + new IgniteQueryIterator[R, K, V](ic, part, qry) + } + + override protected def getPartitions: Array[Partition] = { + val parts = ic.ignite().affinity(ic.cacheName).partitions() + + (0 until parts).map(new IgnitePartition(_)).toArray + } + + override protected def getPreferredLocations(split: Partition): Seq[String] = { + ic.ignite().affinity(ic.cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala new file mode 100644 index 0000000..2def636 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.spark.Partition + +class IgnitePartition(idx: Int) extends Partition { + override def index: Int = idx +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala new file mode 100644 index 0000000..07b24a9 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.cache.query.Query +import org.apache.ignite.spark.IgniteContext +import org.apache.spark.Partition + +class IgniteQueryIterator[R, K, V] ( + ic: IgniteContext[K, V], + part: Partition, + qry: Query[R] + ) extends Iterator[R] { + override def hasNext: Boolean = ??? + + override def next(): R = ??? +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d887323/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala new file mode 100644 index 0000000..3b46d16 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.util + +import scala.util.Try + +object using { + type AutoClosable = { def close(): Unit } + + def apply[A <: AutoClosable, B](resource: A)(code: A => B): B = + try { + code(resource) + } + finally { + Try(resource.close()) + } +} \ No newline at end of file