Repository: incubator-ignite Updated Branches: refs/heads/ignite-389 29dc7221c -> c527a0447
#IGNITE-389 - More functions on API. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37a7679d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37a7679d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37a7679d Branch: refs/heads/ignite-389 Commit: 37a7679df3fd05473840482d0e5c2c9483d02b2a Parents: 5d6bb53 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Fri May 29 08:46:52 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Fri May 29 08:46:52 2015 -0700 ---------------------------------------------------------------------- .../org/apache/ignite/spark/IgniteContext.scala | 13 ++++++++ .../org/apache/ignite/spark/IgniteRDD.scala | 32 +++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37a7679d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index a73405b..5b649db 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -23,6 +23,14 @@ import org.apache.ignite.{Ignition, Ignite} import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} import org.apache.spark.SparkContext +/** + * Ignite context. + * + * @param sparkContext Spark context. + * @param cfgF Configuration factory. + * @tparam K Key type. + * @tparam V Value type. + */ class IgniteContext[K, V]( @scala.transient val sparkContext: SparkContext, cfgF: () => IgniteConfiguration @@ -61,4 +69,9 @@ class IgniteContext[K, V]( } } + def close() = { + val igniteCfg = cfgF() + + Ignition.stop(igniteCfg.getGridName, false) + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37a7679d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 30efa7a..358fcd4 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -29,12 +29,27 @@ import org.apache.spark.{TaskContext, Partition} import scala.collection.JavaConversions._ +/** + * Ignite RDD. Represents Ignite cache as Spark RDD abstraction. + * + * @param ic Ignite context to use. + * @param cacheName Cache name. + * @param cacheCfg Cache configuration. + * @tparam K Key type. + * @tparam V Value type. + */ class IgniteRDD[K, V] ( ic: IgniteContext[K, V], cacheName: String, cacheCfg: CacheConfiguration[K, V] ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) { - + /** + * Computes iterator based on given partition. + * + * @param part Partition to use. + * @param context Task context. + * @return Partition iterator. + */ override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { val cache = ensureCache() @@ -49,6 +64,11 @@ class IgniteRDD[K, V] ( }) } + /** + * Gets partitions for the given cache RDD. + * + * @return Partitions. + */ override protected def getPartitions: Array[Partition] = { ensureCache() @@ -57,6 +77,12 @@ class IgniteRDD[K, V] ( (0 until parts).map(new IgnitePartition(_)).toArray } + /** + * Gets prefferred locations for the given partition. + * + * @param split Split partition. + * @return + */ override protected def getPreferredLocations(split: Partition): Seq[String] = { ensureCache() @@ -129,6 +155,10 @@ class IgniteRDD[K, V] ( }) } + def clear(): Unit = { + ensureCache().removeAll() + } + private def affinityKeyFunc(value: V, node: ClusterNode): Object = { IgniteUuid.randomUuid() }