#IGNITE-389 - Javadoc and API cleanup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3d1e5342 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3d1e5342 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3d1e5342 Branch: refs/heads/ignite-998 Commit: 3d1e5342f32f56d2479ec7971e8fe2f4adfbf468 Parents: 1552a4b Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Jun 8 16:03:34 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Jun 8 16:03:34 2015 -0700 ---------------------------------------------------------------------- .../spark/examples/java/ColocationTest.java | 89 -------------------- .../examples/java/ExampleConfiguration.java | 31 ------- .../examples/java/IgniteProcessExample.java | 80 ------------------ .../spark/examples/java/IgniteStoreExample.java | 68 --------------- .../spark/examples/java/package-info.java | 21 ----- .../org/apache/ignite/spark/IgniteContext.scala | 30 ++++++- .../org/apache/ignite/spark/IgniteRDD.scala | 41 +++++++-- .../ignite/spark/examples/ColocationTest.scala | 39 --------- .../spark/examples/ExampleConfiguration.scala | 41 --------- .../spark/examples/IgniteProcessExample.scala | 52 ------------ .../spark/examples/IgniteStoreExample.scala | 41 --------- 11 files changed, 62 insertions(+), 471 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java deleted file mode 100644 index 20d6e88..0000000 --- a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.examples.java; - -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spark.*; -import org.apache.spark.*; -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.*; - -import scala.Tuple2; - -import java.util.*; - -/** - * Colocation test example. - */ -public class ColocationTest { - /** Keys count. */ - private static final int KEYS_CNT = 10000; - - /** To pair function. */ - private static final IgniteClosure<Integer, Tuple2<Integer, Integer>> TO_PAIR_F = - new IgniteClosure<Integer, Tuple2<Integer, Integer>>() { - @Override public Tuple2<Integer, Integer> apply(Integer i) { - return new Tuple2<>(i, i); - } - }; - - /** To value function. */ - private static final Function<Tuple2<Integer, Integer>, Integer> TO_VALUE_F = - new Function<Tuple2<Integer, Integer>, Integer>() { - /** {@inheritDoc} */ - @Override public Integer call(Tuple2<Integer, Integer> t) throws Exception { - return t._2(); - } - }; - - /** Sum function. */ - private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() { - public Integer call(Integer x, Integer y) { - return x + y; - } - }; - - /** - * @param args Args. - */ - public static void main(String[] args) { - SparkConf conf = new SparkConf(); - - conf.setAppName("Colocation test"); - - JavaSparkContext sc = new JavaSparkContext(conf); - - JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration()); - - JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned"); - - List<Integer> seq = F.range(0, KEYS_CNT + 1); - - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(F.transformList(seq, TO_PAIR_F), 48); - - cache.savePairs(rdd); - - int sum = (KEYS_CNT * KEYS_CNT - KEYS_CNT) / 2; - - // Execute parallel sum. - System.out.println("Local sum: " + sum); - - System.out.println("Distributed sum: " + cache.map(TO_VALUE_F).fold(0, SUM_F)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java deleted file mode 100644 index 5d769f2..0000000 --- a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.examples.java; - -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; - -/** - * Ignite example configuration provider. - */ -public class ExampleConfiguration implements IgniteOutClosure<IgniteConfiguration> { - /** {@inheritDoc} */ - @Override public IgniteConfiguration apply() { - return org.apache.ignite.spark.examples.ExampleConfiguration.configuration(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java deleted file mode 100644 index 8994355..0000000 --- a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.examples.java; - -import org.apache.ignite.spark.*; -import org.apache.spark.*; -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.*; - -import scala.*; - -import java.lang.Boolean; - -/** - * Ignite process example. - */ -public class IgniteProcessExample { - /** Filter function. */ - private static final Function<Tuple2<Object, String>, Boolean> FILTER_F = - new Function<Tuple2<Object, String>, Boolean>() { - @Override public Boolean call(Tuple2<Object, String> t) throws Exception { - System.out.println("Analyzing line: " + t._2()); - - return t._2().contains("Ignite"); - } - }; - - /** To value function. */ - private static final Function<Tuple2<Object, String>, String> TO_VALUE_F = - new Function<Tuple2<Object, String>, String>() { - @Override public String call(Tuple2<Object, String> t) throws Exception { - return t._2(); - } - }; - - /** - * @param args Args. - */ - public static void main(String[] args) { - SparkConf conf = new SparkConf(); - - conf.setAppName("Ignite processing example"); - - JavaSparkContext sc = new JavaSparkContext(conf); - - JavaIgniteContext<Object, String> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration()); - - // Search for lines containing "Ignite". - JavaIgniteRDD<Object, String> scanRdd = ignite.fromCache("partitioned"); - - JavaRDD<String> processedRdd = scanRdd.filter(FILTER_F).map(TO_VALUE_F); - - // Create a new cache for results. - JavaIgniteRDD<Object, String> results = ignite.fromCache("results"); - - results.saveValues(processedRdd); - - // SQL query - ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect(); - - // SQL fields query - DataFrame df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java deleted file mode 100644 index 24ae77f..0000000 --- a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.examples.java; - -import org.apache.ignite.spark.*; -import org.apache.spark.*; -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.*; - -import scala.*; - -import java.lang.Boolean; - -/** - * Ignite store example. - */ -public class IgniteStoreExample { - /** Predicate. */ - private static final Function<String, Boolean> PREDICATE = new Function<String, Boolean>() { - @Override public Boolean call(String s) throws Exception { - System.out.println("Read line: " + s); - - return s.contains("Ignite"); - } - }; - - /** To pair function. */ - private static final PairFunction<String, String, String> TO_PAIR_F = new PairFunction<String, String, String>() { - @Override public Tuple2<String, String> call(String s) throws Exception { - return new Tuple2<>(s, s); - } - }; - - /** - * @param args Args. - */ - public static void main(String[] args) { - SparkConf conf = new SparkConf(); - - conf.setAppName("Ignite processing example"); - - JavaSparkContext sc = new JavaSparkContext(conf); - - JavaIgniteContext<String, String> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration()); - - JavaRDD<String> lines = sc.textFile(args[0]).filter(PREDICATE); - - ignite.fromCache("partitioned").saveValues(lines); - - ignite.fromCache("partitioned").savePairs(lines.mapToPair(TO_PAIR_F)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java deleted file mode 100644 index e3243bf..0000000 --- a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Demonstrates usage of Ignite and Spark from Java. - */ -package org.apache.ignite.spark.examples.java; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 2cfebd6..e52555a 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -21,7 +21,7 @@ package org.apache.ignite.spark import org.apache.ignite.internal.IgnitionEx import org.apache.ignite.{Ignition, Ignite} import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} -import org.apache.spark.SparkContext +import org.apache.spark.{Logging, SparkContext} import org.apache.spark.sql.SQLContext /** @@ -36,7 +36,7 @@ class IgniteContext[K, V]( @scala.transient val sparkContext: SparkContext, cfgF: () â IgniteConfiguration, client: Boolean = true -) extends Serializable { +) extends Serializable with Logging { @scala.transient private val driver = true if (!client) { @@ -45,7 +45,7 @@ class IgniteContext[K, V]( if (workers <= 0) throw new IllegalStateException("No Spark executors found to start Ignite nodes.") - println("Will start Ignite nodes on " + workers + " workers") + logInfo("Will start Ignite nodes on " + workers + " workers") // Start ignite server node on each worker in server mode. sparkContext.parallelize(1 to workers, workers).foreach(it â ignite()) @@ -60,14 +60,34 @@ class IgniteContext[K, V]( val sqlContext = new SQLContext(sparkContext) + /** + * Creates an `IgniteRDD` instance from the given cache name. If the cache does not exist, it will be + * automatically started from template on the first invoked RDD action. + * + * @param cacheName Cache name. + * @return `IgniteRDD` instance. + */ def fromCache(cacheName: String): IgniteRDD[K, V] = { new IgniteRDD[K, V](this, cacheName, null) } + /** + * Creates an `IgniteRDD` instance from the given cache configuration. If the cache does not exist, it will be + * automatically started using the configuration provided on the first invoked RDD action. + * + * @param cacheCfg Cache configuration to use. + * @return `IgniteRDD` instance. + */ def fromCache(cacheCfg: CacheConfiguration[K, V]) = { new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg) } + /** + * Gets an Ignite instance supporting this context. Ignite instance will be started + * if it has not been started yet. + * + * @return Ignite instance. + */ def ignite(): Ignite = { val igniteCfg = cfgF() @@ -87,6 +107,10 @@ class IgniteContext[K, V]( } } + /** + * Stops supporting ignite instance. If ignite instance has been already stopped, this operation will be + * a no-op. + */ def close() = { val igniteCfg = cfgF() http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 5fc457f..2146acb 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -82,7 +82,7 @@ class IgniteRDD[K, V] ( } /** - * Gets prefferred locations for the given partition. + * Gets preferred locations for the given partition. * * @param split Split partition. * @return @@ -94,6 +94,14 @@ class IgniteRDD[K, V] ( .map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList } + /** + * Runs an object SQL on corresponding Ignite cache. + * + * @param typeName Type name to run SQL against. + * @param sql SQL query to run. + * @param args Optional SQL query arguments. + * @return RDD with query results. + */ def objectSql(typeName: String, sql: String, args: Any*): RDD[(K, V)] = { val qry: SqlQuery[K, V] = new SqlQuery[K, V](typeName, sql) @@ -102,6 +110,13 @@ class IgniteRDD[K, V] ( new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry â (entry.getKey, entry.getValue)) } + /** + * Runs an SQL fields query. + * + * @param sql SQL statement to run. + * @param args Optional SQL query arguments. + * @return `DataFrame` instance with the query results. + */ def sql(sql: String, args: Any*): DataFrame = { val qry = new SqlFieldsQuery(sql) @@ -114,7 +129,12 @@ class IgniteRDD[K, V] ( ic.sqlContext.createDataFrame(rowRdd, schema) } - def saveValues(rdd: RDD[V], overwrite: Boolean = false) = { + /** + * Saves values from given RDD into Ignite. A unique key will be generated for each value of the given RDD. + * + * @param rdd RDD instance to save values from. + */ + def saveValues(rdd: RDD[V]) = { rdd.foreachPartition(it â { val ig = ic.ignite() @@ -127,8 +147,6 @@ class IgniteRDD[K, V] ( val streamer = ig.dataStreamer[Object, V](cacheName) try { - streamer.allowOverwrite(overwrite) - it.foreach(value â { val key = affinityKeyFunc(value, node.orNull) @@ -141,6 +159,13 @@ class IgniteRDD[K, V] ( }) } + /** + * Saves values from the given key-value RDD into Ignite. + * + * @param rdd RDD instance to save values from. + * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing + * values in Ignite cache. + */ def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = { rdd.foreachPartition(it â { val ig = ic.ignite() @@ -163,6 +188,9 @@ class IgniteRDD[K, V] ( }) } + /** + * Removes all values from the underlying Ignite cache. + */ def clear(): Unit = { ensureCache().removeAll() } @@ -197,7 +225,7 @@ class IgniteRDD[K, V] ( case "java.sql.Timestamp" â TimestampType case "[B" â BinaryType - case _ â StructType(new Array[StructField](0)) // TODO Do we need to fill user types? + case _ â StructType(new Array[StructField](0)) } /** @@ -210,6 +238,7 @@ class IgniteRDD[K, V] ( private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = { val aff = ic.ignite().affinity[IgniteUuid](cacheName) - Stream.continually(IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node)).get + Stream.from(1, 1000).map(_ â IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node)) + .getOrElse(IgniteUuid.randomUuid()) } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala deleted file mode 100644 index 29587e4..0000000 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.examples - -import org.apache.ignite.spark.IgniteContext -import org.apache.spark.{SparkConf, SparkContext} - -object ColocationTest { - def main(args: Array[String]) { - val conf = new SparkConf().setAppName("Colocation test") - val sc = new SparkContext(conf) - - val ignite = new IgniteContext[Int, Int](sc, ExampleConfiguration.configuration _) - - // Search for lines containing "Ignite". - val cache = ignite.fromCache("partitioned") - - cache.savePairs(sc.parallelize((1 to 100000).toSeq, 48).map(i => (i, i))) - - // Execute parallel sum. - println("Local sum: " + (1 to 100000).sum) - println("Distributed sum: " + cache.map(_._2).sum()) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala deleted file mode 100644 index 3b0dac7..0000000 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ExampleConfiguration.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.examples - -import org.apache.ignite.configuration.IgniteConfiguration -import org.apache.ignite.internal.util.lang.{GridFunc => F} -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder - -object ExampleConfiguration { - def configuration(): IgniteConfiguration = { - val cfg = new IgniteConfiguration() - - val discoSpi = new TcpDiscoverySpi() - - val ipFinder = new TcpDiscoveryVmIpFinder() - - ipFinder.setAddresses(F.asList("127.0.0.1:47500", "127.0.0.1:47501")) - - discoSpi.setIpFinder(ipFinder) - - cfg.setDiscoverySpi(discoSpi) - - cfg - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala deleted file mode 100644 index ab91c62..0000000 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.examples - -import org.apache.ignite.spark.IgniteContext -import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkContext, SparkConf} - -object IgniteProcessExample { - def main(args: Array[String]) { - val conf = new SparkConf().setAppName("Ignite processing example") - val sc = new SparkContext(conf) - - val ignite = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _) - - // Search for lines containing "Ignite". - val scanRdd = ignite.fromCache("partitioned") - - val processedRdd = scanRdd.filter(line => { - println("Analyzing line: " + line) - line._2.contains("Ignite") - - true - }).map(_._2) - - // Create a new cache for results. - val results = ignite.fromCache("results") - - results.saveValues(processedRdd) - - // SQL query - ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect() - - // SQL fields query - val df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d1e5342/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala deleted file mode 100644 index ad6b7e6..0000000 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.examples - -import org.apache.ignite.spark.IgniteContext -import org.apache.spark.SparkContext -import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD - -object IgniteStoreExample { - def main(args: Array[String]) { - val conf = new SparkConf().setAppName("Ignite store example") - val sc = new SparkContext(conf) - - val ignite = new IgniteContext[String, String](sc, () â ExampleConfiguration.configuration()) - - val lines: RDD[String] = sc.textFile(args(0)).filter(line â { - println("Read line: " + line) - - line.contains("IGNITE") - }) - - ignite.fromCache("partitioned").saveValues(lines) - ignite.fromCache("partitioned").savePairs(lines.map(l â (l, l))) - } -} \ No newline at end of file