IGNITE-1061 - Fixed as discussed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b467822d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b467822d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b467822d Branch: refs/heads/ignite-gg-10443 Commit: b467822d55c8f796de2d7b3c2aa80b46f81811c1 Parents: 68c21ac Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Jun 29 20:33:20 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Jun 29 20:33:20 2015 -0700 ---------------------------------------------------------------------- .../core/src/test/config/spark/spark-config.xml | 46 ++++++++++++++++++ .../org/apache/ignite/spark/IgniteContext.scala | 50 ++++++++++++++++++-- .../org/apache/ignite/spark/IgniteRddSpec.scala | 18 +++++++ 3 files changed, 110 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b467822d/modules/core/src/test/config/spark/spark-config.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/spark/spark-config.xml b/modules/core/src/test/config/spark/spark-config.xml new file mode 100644 index 0000000..4b7ffe1 --- /dev/null +++ b/modules/core/src/test/config/spark/spark-config.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> + <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <property name="localHost" value="127.0.0.1"/> + + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <value>127.0.0.1:47500</value> + <value>127.0.0.1:47501</value> + <value>127.0.0.1:47502</value> + <value>127.0.0.1:47503</value> + <value>127.0.0.1:47504</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b467822d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index e52555a..5dbb1d3 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -33,11 +33,13 @@ import org.apache.spark.sql.SQLContext * @tparam V Value type. */ class IgniteContext[K, V]( - @scala.transient val sparkContext: SparkContext, + @transient val sparkContext: SparkContext, cfgF: () â IgniteConfiguration, client: Boolean = true ) extends Serializable with Logging { - @scala.transient private val driver = true + @transient private val driver = true + + private val cfgClo = new Once(cfgF) if (!client) { val workers = sparkContext.getExecutorStorageStatus.length - 1 @@ -51,6 +53,15 @@ class IgniteContext[K, V]( sparkContext.parallelize(1 to workers, workers).foreach(it â ignite()) } + // Make sure to start Ignite on context creation. + ignite() + + /** + * Creates an instance of IgniteContext with the given spring configuration. + * + * @param sc Spark context. + * @param springUrl Spring configuration path. + */ def this( sc: SparkContext, springUrl: String @@ -58,6 +69,17 @@ class IgniteContext[K, V]( this(sc, () â IgnitionEx.loadConfiguration(springUrl).get1()) } + /** + * Creates an instance of IgniteContext with default Ignite configuration. + * By default this method will use grid configuration defined in `IGNITE_HOME/config/default-config.xml` + * configuration file. + * + * @param sc Spark context. + */ + def this(sc: SparkContext) { + this(sc, IgnitionEx.DFLT_CFG) + } + val sqlContext = new SQLContext(sparkContext) /** @@ -89,7 +111,7 @@ class IgniteContext[K, V]( * @return Ignite instance. */ def ignite(): Ignite = { - val igniteCfg = cfgF() + val igniteCfg = cfgClo() try { Ignition.ignite(igniteCfg.getGridName) @@ -112,8 +134,28 @@ class IgniteContext[K, V]( * a no-op. */ def close() = { - val igniteCfg = cfgF() + val igniteCfg = cfgClo() Ignition.stop(igniteCfg.getGridName, false) } } + +/** + * Auxiliary closure that ensures that passed in closure is executed only once. + * + * @param clo Closure to wrap. + */ +private class Once(clo: () â IgniteConfiguration) extends Serializable { + @transient @volatile var res: IgniteConfiguration = null + + def apply(): IgniteConfiguration = { + if (res == null) { + this.synchronized { + if (res == null) + res = clo() + } + } + + res + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b467822d/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala index 26ce693..8fa6949 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala @@ -147,6 +147,24 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be sc.stop() } } + + it("should successfully start spark context with XML configuration") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + "modules/core/src/test/config/spark/spark-config.xml") + + val cache: IgniteRDD[String, String] = ic.fromCache(PARTITIONED_CACHE_NAME) + + cache.savePairs(sc.parallelize(1 to 1000, 2).map(i â (String.valueOf(i), "val" + i))) + + assert(1000 == cache.count()) + } + finally { + sc.stop() + } + } } override protected def beforeEach() = {