#IGNITE-389 - Adding test, fixing colocation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f00a9e99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f00a9e99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f00a9e99 Branch: refs/heads/ignite-929 Commit: f00a9e9980aefeba2d80969730552e5c5651f1c6 Parents: d151244 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed May 27 23:30:49 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed May 27 23:30:49 2015 -0700 ---------------------------------------------------------------------- modules/spark/pom.xml | 8 ---- .../org/apache/ignite/spark/IgniteRDD.scala | 8 ++-- .../ignite/spark/examples/ColocationTest.scala | 40 ++++++++++++++++++++ 3 files changed, 45 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index dc01c76..84055d6 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -49,14 +49,6 @@ </dependency> <dependency> - <groupId>org.apache.ignite</groupId> - <artifactId>ignite-indexing</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index ee0e9b3..30efa7a 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -23,6 +23,7 @@ import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.lang.IgniteUuid import org.apache.ignite.spark.impl.{IgniteAbstractRDD, IgniteSqlRDD, IgnitePartition, IgniteQueryIterator} +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode import org.apache.spark.rdd.RDD import org.apache.spark.{TaskContext, Partition} @@ -37,9 +38,9 @@ class IgniteRDD[K, V] ( override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { val cache = ensureCache() - val qry: ScanQuery[K, V] = new ScanQuery[K, V]() + val qry: ScanQuery[K, V] = new ScanQuery[K, V](part.index) - qry.setPartition(part.index) + val partNodes = ic.ignite().affinity(cache.getName).mapPartitionToPrimaryAndBackups(part.index) val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator() @@ -59,7 +60,8 @@ class IgniteRDD[K, V] ( override protected def getPreferredLocations(split: Partition): Seq[String] = { ensureCache() - ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList + ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index) + .map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList } def objectSql(typeName: String, sql: String, args: Any*): RDD[(K, V)] = { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala new file mode 100644 index 0000000..a0814fa --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples + +import org.apache.ignite.configuration.IgniteConfiguration +import org.apache.ignite.spark.IgniteContext +import org.apache.spark.{SparkContext, SparkConf} + +object ColocationTest { + def main(args: Array[String]) { + val conf = new SparkConf().setAppName("Colocation test") + val sc = new SparkContext(conf) + + val ignite = new IgniteContext[Int, Int](sc, () â new IgniteConfiguration()) + + // Search for lines containing "Ignite". + val cache = ignite.fromCache("partitioned") + + cache.saveTuples(sc.parallelize((1 to 100000).toSeq, 48).map(i => (i, i))) + + // Execute parallel sum. + println("Local sum: " + (1 to 100000).sum) + println("Distributed sum: " + cache.map(_._2).sum()) + } +} \ No newline at end of file