Repository: incubator-ignite Updated Branches: refs/heads/ignite-1009-v4 b06eb0e19 -> 7c51a1416
#IGNITE-389 - More functions on API. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c527a044 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c527a044 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c527a044 Branch: refs/heads/ignite-1009-v4 Commit: c527a04471bd4d263a346ee27a5dbef6c98a5894 Parents: 8503dec Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Fri May 29 16:29:33 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Fri May 29 16:29:33 2015 -0700 ---------------------------------------------------------------------- .../org/apache/ignite/spark/IgniteContext.scala | 6 +++--- .../scala/org/apache/ignite/spark/IgniteRDD.scala | 16 ++++++++-------- .../ignite/spark/examples/ColocationTest.scala | 2 +- .../ignite/spark/examples/IgniteStoreExample.scala | 2 +- .../ignite/spark/impl/IgniteQueryIterator.scala | 2 +- .../org/apache/ignite/spark/impl/IgniteSqlRDD.scala | 2 +- .../org/apache/ignite/spark/IgniteRddSpec.scala | 10 +++++----- 7 files changed, 20 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c527a044/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 5b649db..6259665 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -33,7 +33,7 @@ import org.apache.spark.SparkContext */ class IgniteContext[K, V]( @scala.transient val sparkContext: SparkContext, - cfgF: () => IgniteConfiguration + cfgF: () â IgniteConfiguration ) extends Serializable { def this( sc: SparkContext, @@ -57,14 +57,14 @@ class IgniteContext[K, V]( Ignition.ignite(igniteCfg.getGridName) } catch { - case e: Exception => + case e: Exception â try { igniteCfg.setClientMode(true) Ignition.start(igniteCfg) } catch { - case e: Exception => Ignition.ignite(igniteCfg.getGridName) + case e: Exception â Ignition.ignite(igniteCfg.getGridName) } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c527a044/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 358fcd4..f286b58 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -59,7 +59,7 @@ class IgniteRDD[K, V] ( val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator() - new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry => { + new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry â { (entry.getKey, entry.getValue) }) } @@ -95,7 +95,7 @@ class IgniteRDD[K, V] ( qry.setArgs(args.map(_.asInstanceOf[Object]):_*) - new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry => (entry.getKey, entry.getValue)) + new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry â (entry.getKey, entry.getValue)) } def sql(sql: String, args: Any*): RDD[Seq[Any]] = { @@ -103,11 +103,11 @@ class IgniteRDD[K, V] ( qry.setArgs(args.map(_.asInstanceOf[Object]):_*) - new IgniteSqlRDD[Seq[Any], java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list => list) + new IgniteSqlRDD[Seq[Any], java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list â list) } def saveValues(rdd: RDD[V]) = { - rdd.foreachPartition(it => { + rdd.foreachPartition(it â { val ig = ic.ignite() ensureCache() @@ -119,7 +119,7 @@ class IgniteRDD[K, V] ( val streamer = ig.dataStreamer[Object, V](cacheName) try { - it.foreach(value => { + it.foreach(value â { val key = affinityKeyFunc(value, node.orNull) streamer.addData(key, value) @@ -131,8 +131,8 @@ class IgniteRDD[K, V] ( }) } - def saveTuples(rdd: RDD[(K, V)]) = { - rdd.foreachPartition(it => { + def savePairs(rdd: RDD[(K, V)]) = { + rdd.foreachPartition(it â { val ig = ic.ignite() // Make sure to deploy the cache @@ -145,7 +145,7 @@ class IgniteRDD[K, V] ( val streamer = ig.dataStreamer[K, V](cacheName) try { - it.foreach(tup => { + it.foreach(tup â { streamer.addData(tup._1, tup._2) }) } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c527a044/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala index a0814fa..e1d3d8e 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala @@ -31,7 +31,7 @@ object ColocationTest { // Search for lines containing "Ignite". val cache = ignite.fromCache("partitioned") - cache.saveTuples(sc.parallelize((1 to 100000).toSeq, 48).map(i => (i, i))) + cache.savePairs(sc.parallelize((1 to 100000).toSeq, 48).map(i => (i, i))) // Execute parallel sum. println("Local sum: " + (1 to 100000).sum) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c527a044/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala index 24be795..ad6b7e6 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala @@ -36,6 +36,6 @@ object IgniteStoreExample { }) ignite.fromCache("partitioned").saveValues(lines) - ignite.fromCache("").saveTuples(lines.map(l â (l, l))) + ignite.fromCache("partitioned").savePairs(lines.map(l â (l, l))) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c527a044/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala index b24ba50..4165fd3 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala @@ -19,7 +19,7 @@ package org.apache.ignite.spark.impl class IgniteQueryIterator[T, R] ( cur: java.util.Iterator[T], - conv: (T) => R + conv: (T) â R ) extends Iterator[R] { override def hasNext: Boolean = cur.hasNext http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c527a044/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala index 7cf9f3a..762a6ed 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala @@ -29,7 +29,7 @@ class IgniteSqlRDD[R: ClassTag, T, K, V]( cacheName: String, cacheCfg: CacheConfiguration[K, V], qry: Query[T], - conv: (T) => R + conv: (T) â R ) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) { override def compute(split: Partition, context: TaskContext): Iterator[R] = { new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c527a044/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala index 7af25de..68273da 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala @@ -42,7 +42,7 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be () â configuration("client", client = true)) // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. - ic.fromCache(PARTITIONED_CACHE_NAME).saveTuples(sc.parallelize(0 to 10000, 2).map(i â (String.valueOf(i), "val" + i))) + ic.fromCache(PARTITIONED_CACHE_NAME).savePairs(sc.parallelize(0 to 10000, 2).map(i â (String.valueOf(i), "val" + i))) // Check cache contents. val ignite = Ignition.ignite("grid-0") @@ -92,9 +92,9 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) - cache.saveTuples(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) + cache.savePairs(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) - val res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000).map(_._2).collect() + val res: Array[Entity] = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000).map(_._2).collect() assert(res.length == 1, "Invalid result length") assert(50 == res(0).id, "Invalid result") @@ -117,7 +117,7 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) - cache.saveTuples(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) + cache.savePairs(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect() @@ -142,7 +142,7 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) - cache.saveTuples(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) + cache.savePairs(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect()