#IGNITE-389 - Tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9bb71baf Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9bb71baf Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9bb71baf Branch: refs/heads/ignite-929 Commit: 9bb71bafe9de67bed80de372756b832696d5f0f8 Parents: d72b040 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed May 27 00:08:03 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed May 27 00:08:03 2015 -0700 ---------------------------------------------------------------------- .../cache/GridCacheAbstractFullApiSelfTest.java | 15 ++ modules/spark/pom.xml | 49 ++-- .../org/apache/ignite/spark/IgniteContext.scala | 2 +- .../org/apache/ignite/spark/IgniteRDD.scala | 16 +- .../spark/examples/IgniteProcessExample.scala | 8 +- .../spark/examples/IgniteStoreExample.scala | 7 +- .../apache/ignite/spark/impl/IgniteSqlRDD.scala | 4 +- .../scala/org/apache/ignite/spark/Entity.scala | 28 +++ .../org/apache/ignite/spark/IgniteRddSpec.scala | 241 +++++++++++++++++++ 9 files changed, 334 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 70d8f9c..87f58bfb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -353,6 +353,21 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract } /** + * @throws Exception If failed. + */ + public void testRemoveAllSkipStore() throws Exception { + IgniteCache<String, Integer> jcache = jcache(); + + jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3)); + + jcache.withSkipStore().removeAll(); + + assertEquals((Integer)1, jcache.get("1")); + assertEquals((Integer)2, jcache.get("2")); + assertEquals((Integer)3, jcache.get("3")); + } + + /** * @throws IgniteCheckedException If failed. */ public void testAtomicOps() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index aeae234..dc01c76 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -31,7 +31,7 @@ </parent> <artifactId>ignite-spark</artifactId> - <version>1.0.6-SNAPSHOT</version> + <version>1.0.7-SNAPSHOT</version> <dependencies> <dependency> @@ -47,11 +47,21 @@ <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> @@ -67,21 +77,28 @@ </exclusion> </exclusions> </dependency> - <!--<dependency>--> - <!--<groupId>org.apache.spark</groupId>--> - <!--<artifactId>spark-sql_2.10</artifactId>--> - <!--<version>1.3.1</version>--> - <!--<exclusions>--> - <!--<exclusion>--> - <!--<groupId>com.twitter</groupId>--> - <!--<artifactId>chill_2.11</artifactId>--> - <!--</exclusion>--> - <!--<exclusion>--> - <!--<groupId>com.twitter</groupId>--> - <!--<artifactId>chill-java</artifactId>--> - <!--</exclusion>--> - <!--</exclusions>--> - <!--</dependency>--> + + <!-- Test dependencies --> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.10</artifactId> + <version>2.2.2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 9d9f9a7..a73405b 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -31,7 +31,7 @@ class IgniteContext[K, V]( sc: SparkContext, springUrl: String ) { - this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1()) + this(sc, () â IgnitionEx.loadConfiguration(springUrl).get1()) } def fromCache(cacheName: String): IgniteRDD[K, V] = { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 6a3720c..ee0e9b3 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -62,25 +62,24 @@ class IgniteRDD[K, V] ( ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList } - def query(typeName: String, sql: String, args: Any*): RDD[(K, V)] = { + def objectSql(typeName: String, sql: String, args: Any*): RDD[(K, V)] = { val qry: SqlQuery[K, V] = new SqlQuery[K, V](typeName, sql) - qry.setArgs(args) + qry.setArgs(args.map(_.asInstanceOf[Object]):_*) new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry => (entry.getKey, entry.getValue)) } - def queryFields(sql: String, args: Any*): RDD[Seq[Any]] = { + def sql(sql: String, args: Any*): RDD[Seq[Any]] = { val qry = new SqlFieldsQuery(sql) - qry.setArgs(args) + qry.setArgs(args.map(_.asInstanceOf[Object]):_*) new IgniteSqlRDD[Seq[Any], java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list => list) } def saveValues(rdd: RDD[V]) = { rdd.foreachPartition(it => { - println("Using scala version: " + scala.util.Properties.versionString) val ig = ic.ignite() ensureCache() @@ -95,8 +94,6 @@ class IgniteRDD[K, V] ( it.foreach(value => { val key = affinityKeyFunc(value, node.orNull) - println("Saving: " + key + ", " + value) - streamer.addData(key, value) }) } @@ -106,9 +103,8 @@ class IgniteRDD[K, V] ( }) } - def save(rdd: RDD[(K, V)]) = { + def saveTuples(rdd: RDD[(K, V)]) = { rdd.foreachPartition(it => { - println("Using scala version: " + scala.util.Properties.versionString) val ig = ic.ignite() // Make sure to deploy the cache @@ -122,8 +118,6 @@ class IgniteRDD[K, V] ( try { it.foreach(tup => { - println("Saving: " + tup._1 + ", " + tup._2) - streamer.addData(tup._1, tup._2) }) } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala index e1f4cf0..e1d3326 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala @@ -17,7 +17,9 @@ package org.apache.ignite.spark.examples +import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.spark.IgniteContext +import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object IgniteProcessExample { @@ -43,9 +45,11 @@ object IgniteProcessExample { results.saveValues(processedRdd) // SQL query - ignite.fromCache("indexed").query("Person", "age > ?", 20).collect() + ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect() + + ignite.fromCache(new CacheConfiguration[Object, String]("ad")) // SQL fields query - ignite.fromCache("indexed").queryFields("select name, age from Person where age > ?", 20).collect() + val sqlRes: RDD[Seq[Any]] = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala index c74804e..24be795 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala @@ -27,14 +27,15 @@ object IgniteStoreExample { val conf = new SparkConf().setAppName("Ignite store example") val sc = new SparkContext(conf) - val ignite = new IgniteContext[String, String](sc, ExampleConfiguration.configuration _) + val ignite = new IgniteContext[String, String](sc, () â ExampleConfiguration.configuration()) - val lines: RDD[String] = sc.textFile(args(0)).filter(line => { + val lines: RDD[String] = sc.textFile(args(0)).filter(line â { println("Read line: " + line) - true + line.contains("IGNITE") }) ignite.fromCache("partitioned").saveValues(lines) + ignite.fromCache("").saveTuples(lines.map(l â (l, l))) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala index eea389d..7cf9f3a 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala @@ -32,9 +32,7 @@ class IgniteSqlRDD[R: ClassTag, T, K, V]( conv: (T) => R ) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) { override def compute(split: Partition, context: TaskContext): Iterator[R] = { - val it: java.util.Iterator[T] = ensureCache().query(qry).iterator() - - new IgniteQueryIterator[T, R](it, conv) + new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv) } override protected def getPartitions: Array[Partition] = { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala new file mode 100644 index 0000000..00beac6 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.spark.IgniteRddSpec.ScalarCacheQuerySqlField + +class Entity ( + @ScalarCacheQuerySqlField(index = true) val id: Int, + @ScalarCacheQuerySqlField(index = true) val name: String, + @ScalarCacheQuerySqlField(index = true) val salary: Int +) extends Serializable { + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9bb71baf/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala new file mode 100644 index 0000000..7af25de --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignition +import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField} +import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder +import org.apache.spark.SparkContext +import org.junit.runner.RunWith +import org.scalatest._ +import org.scalatest.junit.JUnitRunner + +import IgniteRddSpec._ + +import scala.annotation.meta.field + +@RunWith(classOf[JUnitRunner]) +class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { + describe("IgniteRDD") { + it("should successfully store data to ignite") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + () â configuration("client", client = true)) + + // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. + ic.fromCache(PARTITIONED_CACHE_NAME).saveTuples(sc.parallelize(0 to 10000, 2).map(i â (String.valueOf(i), "val" + i))) + + // Check cache contents. + val ignite = Ignition.ignite("grid-0") + + for (i â 0 to 10000) { + val res = ignite.cache[String, String](PARTITIONED_CACHE_NAME).get(String.valueOf(i)) + + assert(res != null, "Value was not put to cache for key: " + i) + assert("val" + i == res, "Invalid value stored for key: " + i) + } + } + finally { + sc.stop() + } + } + + it("should successfully read data from ignite") { + val sc = new SparkContext("local[*]", "test") + + try { + val cache = Ignition.ignite("grid-0").cache[String, Int](PARTITIONED_CACHE_NAME) + + val num = 10000 + + for (i â 0 to num) { + cache.put(String.valueOf(i), i) + } + + val ic = new IgniteContext[String, Int](sc, + () â configuration("client", client = true)) + + val res = ic.fromCache(PARTITIONED_CACHE_NAME).map(_._2).sum() + + assert(res == (0 to num).sum) + } + finally { + sc.stop() + } + } + + it("should successfully query objects from ignite") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, Entity](sc, + () â configuration("client", client = true)) + + val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) + + cache.saveTuples(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) + + val res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000).map(_._2).collect() + + assert(res.length == 1, "Invalid result length") + assert(50 == res(0).id, "Invalid result") + assert("name50" == res(0).name, "Invalid result") + assert(5000 == res(0).salary) + + assert(500 == cache.objectSql("Entity", "id > 500").count(), "Invalid count") + } + finally { + sc.stop() + } + } + + it("should successfully query fields from ignite") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, Entity](sc, + () â configuration("client", client = true)) + + val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) + + cache.saveTuples(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) + + val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect() + + assert(res.length == 1, "Invalid result length") + assert(50 == res(0).head, "Invalid result") + assert("name50" == res(0)(1), "Invalid result") + assert(5000 == res(0)(2), "Invalid result") + + assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count") + } + finally { + sc.stop() + } + } + + it("should successfully store values RDD") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, Entity](sc, + () â configuration("client", client = true)) + + val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) + + cache.saveTuples(sc.parallelize(0 to 1000, 2).map(i â (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) + + val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect() + + assert(res.length == 1, "Invalid result length") + assert(50 == res(0).head, "Invalid result") + assert("name50" == res(0)(1), "Invalid result") + assert(5000 == res(0)(2), "Invalid result") + + assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count") + } + finally { + sc.stop() + } + } + } + + override protected def beforeEach() = { + Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll() + } + + override protected def afterEach() = { + Ignition.stop("client", false) + } + + override protected def beforeAll() = { + for (i â 0 to 3) { + Ignition.start(configuration("grid-" + i, client = false)) + } + } + + override protected def afterAll() = { + for (i â 0 to 3) { + Ignition.stop("grid-" + i, false) + } + } +} + +/** + * Constants and utility methods. + */ +object IgniteRddSpec { + /** IP finder for the test. */ + val IP_FINDER = new TcpDiscoveryVmIpFinder(true) + + /** Partitioned cache name. */ + val PARTITIONED_CACHE_NAME = "partitioned" + + /** Type alias for `QuerySqlField`. */ + type ScalarCacheQuerySqlField = QuerySqlField @field + + /** Type alias for `QueryTextField`. */ + type ScalarCacheQueryTextField = QueryTextField @field + + /** + * Gets ignite configuration. + * + * @param gridName Grid name. + * @param client Client mode flag. + * @return Ignite configuration. + */ + def configuration(gridName: String, client: Boolean): IgniteConfiguration = { + val cfg = new IgniteConfiguration + + val discoSpi = new TcpDiscoverySpi + + discoSpi.setIpFinder(IgniteRddSpec.IP_FINDER) + + cfg.setDiscoverySpi(discoSpi) + + cfg.setCacheConfiguration(cacheConfiguration(gridName)) + + cfg.setClientMode(client) + + cfg.setGridName(gridName) + + cfg + } + + /** + * Gets cache configuration for the given grid name. + * + * @param gridName Grid name. + * @return Cache configuration. + */ + def cacheConfiguration(gridName: String): CacheConfiguration[Object, Object] = { + val ccfg = new CacheConfiguration[Object, Object]() + + ccfg.setBackups(1) + + ccfg.setName(PARTITIONED_CACHE_NAME) + + ccfg.setIndexedTypes(classOf[String], classOf[Entity]) + + ccfg + } +}