IGNITE-389 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/389ec79d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/389ec79d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/389ec79d Branch: refs/heads/ignite-389 Commit: 389ec79dff949fda2c7c16c84141b1b5f86793b5 Parents: edd1a95 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue May 19 19:51:43 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue May 19 19:51:43 2015 -0700 ---------------------------------------------------------------------- modules/spark/pom.xml | 47 ++++++----- .../org/apache/ignite/spark/IgniteContext.scala | 85 +++++++++++++++----- .../org/apache/ignite/spark/IgniteRDD.scala | 13 +-- .../spark/examples/ExampleConfiguration.scala | 41 ++++++++++ .../spark/examples/IgniteProcessExample.scala | 44 ++++++++++ .../spark/examples/IgniteStoreExample.scala | 40 +++++++++ .../spark/util/SerializablePredicate2.scala | 32 ++++++++ .../org/apache/ignite/spark/util/using.scala | 32 -------- 8 files changed, 248 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index 9d4ea86..aeae234 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -48,37 +48,40 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.ignite</groupId> - <artifactId>ignite-scalar</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> - <version>2.11.2</version> + <version>2.10.4</version> </dependency> <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.11</artifactId> - <version>2.2.4</version> - <scope>test</scope> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>1.3.1</version> <exclusions> <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> + <groupId>com.twitter</groupId> + <artifactId>chill_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>com.twitter</groupId> + <artifactId>chill-java</artifactId> </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> - <version>1.3.1</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> - <version>1.3.1</version> - </dependency> + <!--<dependency>--> + <!--<groupId>org.apache.spark</groupId>--> + <!--<artifactId>spark-sql_2.10</artifactId>--> + <!--<version>1.3.1</version>--> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<groupId>com.twitter</groupId>--> + <!--<artifactId>chill_2.11</artifactId>--> + <!--</exclusion>--> + <!--<exclusion>--> + <!--<groupId>com.twitter</groupId>--> + <!--<artifactId>chill-java</artifactId>--> + <!--</exclusion>--> + <!--</exclusions>--> + <!--</dependency>--> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 0913605..56d2a05 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -17,42 +17,51 @@ package org.apache.ignite.spark +import javax.cache.Cache + +import org.apache.ignite.cache.query.{Query, ScanQuery} import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.internal.IgnitionEx -import org.apache.ignite.spark.util.using +import org.apache.ignite.lang.IgniteUuid +import org.apache.ignite.spark.util.SerializablePredicate2 import org.apache.ignite.{Ignition, IgniteCache, Ignite} import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + class IgniteContext[K, V]( - sc: SparkContext, - igniteCfg: IgniteConfiguration, + @scala.transient sc: SparkContext, + cfgF: () => IgniteConfiguration, val cacheName: String, cacheCfg: CacheConfiguration[K, V] -) { +) extends Serializable { + type ScanRDD[K1, V1] = IgniteRDD[Cache.Entry[K1, V1], K1, V1] + def this( sc: SparkContext, springUrl: String, cacheName: String ) { - this(sc, IgnitionEx.loadConfiguration(springUrl).get1(), cacheName, null) + this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), cacheName, null) } def this( sc: SparkContext, - igniteCfg: IgniteConfiguration, + cfgF: () => IgniteConfiguration, cacheName: String ) { - this(sc, igniteCfg, cacheName, null) + this(sc, cfgF, cacheName, null) } def this( sc: SparkContext, - igniteCfg: IgniteConfiguration, + cfgF: () => IgniteConfiguration, cacheCfg: CacheConfiguration[K, V] ) { - this(sc, igniteCfg, cacheCfg.getName, cacheCfg) + this(sc, cfgF, cacheCfg.getName, cacheCfg) } def this( @@ -60,43 +69,75 @@ class IgniteContext[K, V]( springUrl: String, cacheCfg: CacheConfiguration[K, V] ) { - this(sc, IgnitionEx.loadConfiguration(springUrl).get1(), cacheCfg.getName, cacheCfg) + this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), cacheCfg.getName, cacheCfg) } def sparkContext() = sc - def saveToIgnite(rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, ClusterNode) => K = affinityKeyFunc) = { + def scan(p: (K, V) => Boolean = (_, _) => true): ScanRDD[K, V] = { + new ScanRDD(this, new ScanQuery[K, V](new SerializablePredicate2[K, V](p))) + } + + def scan[R:ClassTag](qry: Query[R]): IgniteRDD[R, K, V] = { + new IgniteRDD[R, K, V](this, qry) + } + + def saveToIgnite[T](rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, ClusterNode) => T = affinityKeyFunc(_: IgniteContext[K, V], _:V, _: ClusterNode)) = { rdd.foreachPartition(it => { - // TODO get affinity node + println("Using scala version: " + scala.util.Properties.versionString) + // Make sure to deploy the cache + igniteCache() + + val ig = ignite() - using(ignite().dataStreamer[K, V](cacheName)) { streamer => + val locNode = ig.cluster().localNode() + + val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode)) + + val streamer = ignite().dataStreamer[T, V](cacheName) + + try { it.foreach(value => { - val key: K = keyFunc(this, value, null) + val key: T = keyFunc(this, value, node.orNull) + + println("Saving: " + key + ", " + value) + streamer.addData(key, value) }) } + finally { + streamer.close() + } }) } def ignite(): Ignite = { + val igniteCfg = cfgF() + try { Ignition.ignite(igniteCfg.getGridName) } catch { case e: Exception => - igniteCfg.setClientMode(true) + try { + igniteCfg.setClientMode(true) - Ignition.start(igniteCfg) + Ignition.start(igniteCfg) + } + catch { + case e: Exception => Ignition.ignite(igniteCfg.getGridName) + } } } - def igniteCache(): IgniteCache[K, V] = { -// new IgniteRDD[Object, K, V](this, (k: K, v: V) => {true}) - - ignite().cache(cacheName) + private def igniteCache(): IgniteCache[K, V] = { + if (cacheCfg == null) + ignite().getOrCreateCache(cacheName) + else + ignite().getOrCreateCache(cacheCfg) } - private def affinityKeyFunc(ic: IgniteContext[K, V], key: K, node: ClusterNode) = { - null + private def affinityKeyFunc(ic: IgniteContext[K, V], value: V, node: ClusterNode): Object = { + IgniteUuid.randomUuid() } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 186d1ae..4018c53 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -17,25 +17,18 @@ package org.apache.ignite.spark -import org.apache.ignite.cache.query.{ScanQuery, Query} -import org.apache.ignite.scalar.lang.ScalarPredicate2 +import org.apache.ignite.cache.query.Query import org.apache.ignite.spark.impl.{IgniteQueryIterator, IgnitePartition} import org.apache.spark.{TaskContext, Partition} import org.apache.spark.rdd.RDD import scala.collection.JavaConversions._ +import scala.reflect.ClassTag -class IgniteRDD[R, K, V]( +class IgniteRDD[R:ClassTag, K, V]( ic: IgniteContext[K, V], qry: Query[R] ) extends RDD[R] (ic.sparkContext(), deps = Nil) { - def this( - ic: IgniteContext[K, V], - p: (K, V) => Boolean - ) = { - this(ic, new ScanQuery[K, V](new ScalarPredicate2[K, V](p))) - } - override def compute(part: Partition, context: TaskContext): Iterator[R] = { new IgniteQueryIterator[R, K, V](ic, part, qry) } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala new file mode 100644 index 0000000..3b0dac7 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples + +import org.apache.ignite.configuration.IgniteConfiguration +import org.apache.ignite.internal.util.lang.{GridFunc => F} +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder + +object ExampleConfiguration { + def configuration(): IgniteConfiguration = { + val cfg = new IgniteConfiguration() + + val discoSpi = new TcpDiscoverySpi() + + val ipFinder = new TcpDiscoveryVmIpFinder() + + ipFinder.setAddresses(F.asList("127.0.0.1:47500", "127.0.0.1:47501")) + + discoSpi.setIpFinder(ipFinder) + + cfg.setDiscoverySpi(discoSpi) + + cfg + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala new file mode 100644 index 0000000..4aeecb0 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples + +import org.apache.ignite.spark.IgniteContext +import org.apache.spark.{SparkContext, SparkConf} + +object IgniteProcessExample { + def main(args: Array[String]) { + val conf = new SparkConf().setAppName("Ignite processing example") + val sc = new SparkContext(conf) + + val partitioned = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _, "partitioned") + + // Search for lines containing "Ignite". + val scan = partitioned.scan((k, v) => v.contains("Ignite")) + + val processed = scan.filter(line => { + println("Analyzing line: " + line) + + true + }).map(_.getValue) + + // Create a new cache for results. + val results = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _, "results") + + results.saveToIgnite(processed) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala new file mode 100644 index 0000000..a7823f4 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples + +import org.apache.ignite.spark.IgniteContext +import org.apache.spark.SparkContext +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD + +object IgniteStoreExample { + def main(args: Array[String]) { + val conf = new SparkConf().setAppName("Ignite store example") + val sc = new SparkContext(conf) + + val ignite = new IgniteContext[String, String](sc, ExampleConfiguration.configuration _, "partitioned") + + val lines: RDD[String] = sc.textFile(args(0)).filter(line => { + println("Read line: " + line) + + true + }) + + ignite.saveToIgnite(lines) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala new file mode 100644 index 0000000..484d0df --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.util + +import org.apache.ignite.lang.IgniteBiPredicate + +/** + * Peer deploy aware adapter for Java's `GridPredicate2`. + */ +class SerializablePredicate2[T1, T2](private val p: (T1, T2) => Boolean) extends IgniteBiPredicate[T1, T2] { + assert(p != null) + + /** + * Delegates to passed in function. + */ + def apply(e1: T1, e2: T2) = p(e1, e2) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/389ec79d/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala deleted file mode 100644 index 3b46d16..0000000 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/util/using.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.util - -import scala.util.Try - -object using { - type AutoClosable = { def close(): Unit } - - def apply[A <: AutoClosable, B](resource: A)(code: A => B): B = - try { - code(resource) - } - finally { - Try(resource.close()) - } -} \ No newline at end of file