http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala deleted file mode 100644 index 2656f44..0000000 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.scalar.examples - -import java.lang.{Integer => JavaInt} -import java.util.ConcurrentModificationException -import javax.cache.Cache - -import org.apache.ignite.IgniteCache -import org.apache.ignite.cache.CacheMode -import org.apache.ignite.scalar.scalar -import org.apache.ignite.scalar.scalar._ -import org.jsr166.ThreadLocalRandom8 - -import scala.collection.JavaConversions._ - -/** - * <a href="http://en.wikipedia.org/wiki/Snowflake_schema">Snowflake Schema</a> is a logical - * arrangement of data in which data is split into `dimensions` and `facts` - * <i>Dimensions</i> can be referenced or joined by other <i>dimensions</i> or <i>facts</i>, - * however, <i>facts</i> are generally not referenced by other facts. You can view <i>dimensions</i> - * as your master or reference data, while <i>facts</i> are usually large data sets of events or - * other objects that continuously come into the system and may change frequently. In Ignite - * such architecture is supported via cross-cache queries. By storing <i>dimensions</i> in - * `CacheMode#REPLICATED REPLICATED` caches and <i>facts</i> in much larger - * `CacheMode#PARTITIONED PARTITIONED` caches you can freely execute distributed joins across - * your whole in-memory data ignite cluster, thus querying your in memory data without any limitations. - * <p/> - * In this example we have two <i>dimensions</i>, `DimProduct` and `DimStore` and - * one <i>fact</i> - `FactPurchase`. Queries are executed by joining dimensions and facts - * in various ways. - * <p/> - * Remote nodes should be started using `ExampleNodeStartup` which will - * start node with `examples/config/example-ignite.xml` configuration. - */ -object ScalarSnowflakeSchemaExample { - /** Configuration file name. */ - private val CONFIG = "examples/config/example-ignite.xml" - - /** Name of replicated cache specified in spring configuration. */ - private val REPL_NAME = "ScalarSnowflakeSchemaExampleReplicated" - - /** Name of partitioned cache specified in spring configuration. */ - private val PART_NAME = "ScalarSnowflakeSchemaExamplePartitioned" - - /** ID generator. */ - private[this] val idGen = Stream.from(System.currentTimeMillis.toInt).iterator - - /** DimStore data. */ - private[this] val dataStore = scala.collection.mutable.Map[JavaInt, DimStore]() - - /** DimProduct data. */ - private[this] val dataProduct = scala.collection.mutable.Map[JavaInt, DimProduct]() - - /** - * Example entry point. No arguments required. - */ - def main(args: Array[String]) { - scalar(CONFIG) { - val dimCache = createCache$[JavaInt, AnyRef](REPL_NAME, CacheMode.REPLICATED, Seq(classOf[JavaInt], classOf[DimStore], - classOf[JavaInt], classOf[DimProduct])) - - try { - val factCache = createCache$[JavaInt, FactPurchase](PART_NAME, indexedTypes = Seq(classOf[JavaInt], classOf[FactPurchase])) - - try { - populateDimensions(dimCache) - populateFacts(factCache) - - queryStorePurchases() - queryProductPurchases() - } - finally { - factCache.close() - } - } - finally { - dimCache.close() - } - } - } - - /** - * Populate cache with `dimensions` which in our case are - * `DimStore` and `DimProduct` instances. - */ - def populateDimensions(dimCache: IgniteCache[JavaInt, AnyRef]) { - val store1 = new DimStore(idGen.next(), "Store1", "12345", "321 Chilly Dr, NY") - val store2 = new DimStore(idGen.next(), "Store2", "54321", "123 Windy Dr, San Francisco") - - // Populate stores. - dimCache.put(store1.id, store1) - dimCache.put(store2.id, store2) - - dataStore.put(store1.id, store1) - dataStore.put(store2.id, store2) - - for (i <- 1 to 20) { - val product = new DimProduct(idGen.next(), "Product" + i, i + 1, (i + 1) * 10) - - dimCache.put(product.id, product) - - dataProduct.put(product.id, product) - } - } - - /** - * Populate cache with `facts`, which in our case are `FactPurchase` objects. - */ - def populateFacts(factCache: IgniteCache[JavaInt, FactPurchase]) { - for (i <- 1 to 100) { - val store: DimStore = rand(dataStore.values) - val prod: DimProduct = rand(dataProduct.values) - val purchase: FactPurchase = new FactPurchase(idGen.next(), prod.id, store.id, i + 1) - - factCache.put(purchase.id, purchase) - } - } - - /** - * Query all purchases made at a specific store. This query uses cross-cache joins - * between `DimStore` objects stored in `replicated` cache and - * `FactPurchase` objects stored in `partitioned` cache. - */ - def queryStorePurchases() { - val factCache = ignite$.cache[JavaInt, FactPurchase](PART_NAME) - - val storePurchases = factCache.sql( - "from \"" + REPL_NAME + "\".DimStore, \"" + PART_NAME + "\".FactPurchase " + - "where DimStore.id=FactPurchase.storeId and DimStore.name=?", "Store1") - - printQueryResults("All purchases made at store1:", storePurchases.getAll) - } - - /** - * Query all purchases made at a specific store for 3 specific products. - * This query uses cross-cache joins between `DimStore`, `DimProduct` - * objects stored in `replicated` cache and `FactPurchase` objects - * stored in `partitioned` cache. - */ - private def queryProductPurchases() { - val factCache = ignite$.cache[JavaInt, FactPurchase](PART_NAME) - - // All purchases for certain product made at store2. - // ================================================= - val p1: DimProduct = rand(dataProduct.values) - val p2: DimProduct = rand(dataProduct.values) - val p3: DimProduct = rand(dataProduct.values) - - println("IDs of products [p1=" + p1.id + ", p2=" + p2.id + ", p3=" + p3.id + ']') - - val prodPurchases = factCache.sql( - "from \"" + REPL_NAME + "\".DimStore, \"" + REPL_NAME + "\".DimProduct, \"" + - PART_NAME + "\".FactPurchase " + - "where DimStore.id=FactPurchase.storeId and " + - "DimProduct.id=FactPurchase.productId and " + - "DimStore.name=? and DimProduct.id in(?, ?, ?)", - "Store2", p1.id, p2.id, p3.id) - - printQueryResults("All purchases made at store2 for 3 specific products:", prodPurchases.getAll) - } - - /** - * Print query results. - * - * @param msg Initial message. - * @param res Results to print. - */ - private def printQueryResults[V](msg: String, res: Iterable[Cache.Entry[JavaInt, V]]) { - println(msg) - - for (e <- res) - println(" " + e.getValue.toString) - } - - /** - * Gets random value from given collection. - * - * @param c Input collection (no `null` and not emtpy). - * @return Random value from the input collection. - */ - def rand[T](c: Iterable[_ <: T]): T = { - val n: Int = ThreadLocalRandom8.current.nextInt(c.size) - - var i: Int = 0 - - for (t <- c) { - if (i < n) - i += 1 - else - return t - } - - throw new ConcurrentModificationException - } -} - -/** - * Represents a physical store location. In our `snowflake` schema a `store` - * is a `dimension` and will be cached in `CacheMode#REPLICATED` cache. - * - * @param id Primary key. - * @param name Store name. - * @param zip Zip code. - * @param addr Address. - */ -class DimStore( - @ScalarCacheQuerySqlField - val id: Int, - @ScalarCacheQuerySqlField - val name: String, - val zip: String, - val addr: String) { - /** - * `toString` implementation. - */ - override def toString: String = { - val sb: StringBuilder = new StringBuilder - - sb.append("DimStore ") - sb.append("[id=").append(id) - sb.append(", name=").append(name) - sb.append(", zip=").append(zip) - sb.append(", addr=").append(addr) - sb.append(']') - - sb.toString() - } -} - -/** - * Represents a product available for purchase. In our `snowflake` schema a `product` - * is a `dimension` and will be cached in `CacheMode#REPLICATED` cache. - * - * @param id Product ID. - * @param name Product name. - * @param price Product list price. - * @param qty Available product quantity. - */ -class DimProduct( - @ScalarCacheQuerySqlField - val id: Int, - val name: String, - @ScalarCacheQuerySqlField - val price: Float, - val qty: Int) { - /** - * `toString` implementation. - */ - override def toString: String = { - val sb: StringBuilder = new StringBuilder - - sb.append("DimProduct ") - sb.append("[id=").append(id) - sb.append(", name=").append(name) - sb.append(", price=").append(price) - sb.append(", qty=").append(qty) - sb.append(']') - - sb.toString() - } -} - -/** - * Represents a purchase record. In our `snowflake` schema purchase - * is a `fact` and will be cached in larger `CacheMode#PARTITIONED` cache. - * - * @param id Purchase ID. - * @param productId Purchased product ID. - * @param storeId Store ID. - * @param purchasePrice Purchase price. - */ -class FactPurchase( - @ScalarCacheQuerySqlField - val id: Int, - @ScalarCacheQuerySqlField - val productId: Int, - @ScalarCacheQuerySqlField - val storeId: Int, - @ScalarCacheQuerySqlField - val purchasePrice: Float) { - /** - * `toString` implementation. - */ - override def toString: String = { - val sb: StringBuilder = new StringBuilder - - sb.append("FactPurchase ") - sb.append("[id=").append(id) - sb.append(", productId=").append(productId) - sb.append(", storeId=").append(storeId) - sb.append(", purchasePrice=").append(purchasePrice) - sb.append(']') - - sb.toString() - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarTaskExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarTaskExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarTaskExample.scala deleted file mode 100644 index 21073e5..0000000 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarTaskExample.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.scalar.examples - -import java.util - -import org.apache.ignite.compute.{ComputeJob, ComputeJobResult, ComputeTaskSplitAdapter} -import org.apache.ignite.scalar.scalar -import org.apache.ignite.scalar.scalar._ - -import scala.collection.JavaConversions._ - -/** - * Demonstrates use of full ignite task API using Scalar. Note that using task-based - * ignite enabling gives you all the advanced features of Ignite such as custom topology - * and collision resolution, custom failover, mapping, reduction, load balancing, etc. - * As a trade off in such cases the more code needs to be written vs. simple closure execution. - * <p/> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. - * <p/> - * Alternatively you can run `ExampleNodeStartup` in another JVM which will - * start node with `examples/config/example-ignite.xml` configuration. - */ -object ScalarTaskExample extends App { - scalar("examples/config/example-ignite.xml") { - ignite$.compute().execute(classOf[IgniteHelloWorld], "Hello Cloud World!") - } - - /** - * This task encapsulates the logic of MapReduce. - */ - class IgniteHelloWorld extends ComputeTaskSplitAdapter[String, Void] { - def split(clusterSize: Int, arg: String): java.util.Collection[_ <: ComputeJob] = { - (for (w <- arg.split(" ")) yield toJob(() => println(w))).toSeq - } - - def reduce(results: util.List[ComputeJobResult]) = null - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarWorldShortestMapReduce.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarWorldShortestMapReduce.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarWorldShortestMapReduce.scala deleted file mode 100644 index 723cdae..0000000 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarWorldShortestMapReduce.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.scalar.examples - -import org.apache.ignite.scalar.scalar -import org.apache.ignite.scalar.scalar._ - -/** - * Shows the world's shortest MapReduce application that calculates non-space - * length of the input string. This example works equally on one computer or - * on thousands requiring no special configuration or deployment. - * <p/> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. - * <p/> - * Alternatively you can run `ExampleNodeStartup` in another JVM which will - * start node with `examples/config/example-ignite.xml` configuration. - */ -object ScalarWorldShortestMapReduce extends App { - scalar("examples/config/example-ignite.xml") { - val input = "World shortest mapreduce application" - - println("Non-space characters count: " + - ignite$.reduce$[Int, Int](for (w <- input.split(" ")) yield () => w.length, _.sum, null) - ) - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeAsyncExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeAsyncExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeAsyncExample.scala new file mode 100644 index 0000000..826e2c9 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeAsyncExample.scala @@ -0,0 +1,59 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.lang.IgniteRunnable +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +/** + * Demonstrates a simple use of [[IgniteRunnable]]. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeAsyncExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println("Compute asynchronous example started.") + + val compute = ignite$.compute().withAsync() + + "Print words using runnable".split(" ").map(word => { + compute.run(() => { + println() + println(">>> Printing '" + word + "' on this node from ignite job.") + }) + + compute.future[Void] + }).foreach(_.get()) + + println() + println(">>> Finished printing words using runnable execution.") + println(">>> Check all nodes for output (this node is also part of the cluster).") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeBroadcastExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeBroadcastExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeBroadcastExample.scala new file mode 100644 index 0000000..a282cd2 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeBroadcastExample.scala @@ -0,0 +1,107 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.lang.IgniteCallable +import org.apache.ignite.resources.IgniteInstanceResource +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{Ignite, IgniteException} + +import java.util.{Collection => JavaCollection} + +import scala.collection.JavaConversions._ + +/** + * Demonstrates broadcasting computations within cluster. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeBroadcastExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println(">>> Compute broadcast example started.") + + // Print hello message on all nodes. + hello(ignite$) + + // Gather system info from all nodes. + gatherSystemInfo(ignite$) + } + + /** + * Print 'Hello' message on all nodes. + * + * @param ignite Ignite instance. + * @throws IgniteException If failed. + */ + @throws(classOf[IgniteException]) + private def hello(ignite: Ignite) { + ignite.compute.broadcast(toRunnable(() => { + println() + println(">>> Hello Node! :)") + })) + + println() + println(">>> Check all nodes for hello message output.") + } + + /** + * Gather system info from all nodes and print it out. + * + * @param ignite Ignite instance. + * @throws IgniteException if failed. + */ + @throws(classOf[IgniteException]) + private def gatherSystemInfo(ignite: Ignite) { + val res: JavaCollection[String] = ignite.compute.broadcast(new IgniteCallable[String] { + @IgniteInstanceResource private var ignite: Ignite = null + + @impl def call: String = { + println() + println("Executing task on node: " + ignite.cluster.localNode.id) + + "Node ID: " + ignite.cluster.localNode.id + "\n" + + "OS: " + System.getProperty("os.name") + " " + System.getProperty("os.version") + " " + + System.getProperty("os.arch") + "\n" + + "User: " + System.getProperty("user.name") + "\n" + + "JRE: " + System.getProperty("java.runtime.name") + " " + System.getProperty("java.runtime.version") + } + }) + + println() + println("Nodes system information:") + println() + + for (r <- res) { + println(r) + println() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeCallableExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeCallableExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeCallableExample.scala new file mode 100644 index 0000000..86fa1bd --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeCallableExample.scala @@ -0,0 +1,73 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.lang.IgniteCallable +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.{ArrayList => JavaArrayList, Collection => JavaCollection} + +import scala.collection.JavaConversions._ + +/** + * Demonstrates using of [[IgniteCallable]] job execution on the cluster. + * <p> + * This example takes a sentence composed of multiple words and counts number of non-space + * characters in the sentence by having each compute job count characters in each individual + * word. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeCallableExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println(">>> Compute callable example started.") + + val calls: JavaCollection[IgniteCallable[Int]] = new JavaArrayList[IgniteCallable[Int]] + + // Iterate through all words in the sentence and create callable jobs. + "Count characters using callable".split(" ").map(word => { + calls.add(() => { + println() + println(">>> Printing '" + word + "' on this node from ignite job.") + + word.length + }) + }) + + // Execute collection of callables on the ignite. + val res = ignite$.compute.call(calls) + + val sum = res.sum + + println() + println(">>> Total number of characters in the phrase is '" + sum + "'.") + println(">>> Check all nodes for output (this node is also part of the cluster).") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeClosureExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeClosureExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeClosureExample.scala new file mode 100644 index 0000000..f0abda0 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeClosureExample.scala @@ -0,0 +1,64 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.{Arrays} + +import scala.collection.JavaConversions._ + +/** + * Demonstrates a simple use of Ignite with reduce closure. + * <p> + * This example splits a phrase into collection of words, computes their length on different + * nodes and then computes total amount of non-whitespaces characters in the phrase. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeClosureExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println(">>> Compute closure example started.") + + // Execute closure on all cluster nodes. + val res = ignite$.compute().apply((word: String) => { + println() + println(">>> Printing '" + word + "' on this node from ignite job.") + + word.length + }, Arrays.asList("Count characters using closure".split(" "):_*)) + + var sum = res.sum + + println() + println(">>> Total number of characters in the phrase is '" + sum + "'.") + println(">>> Check all nodes for output (this node is also part of the cluster).") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeContinuousMapperExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeContinuousMapperExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeContinuousMapperExample.scala new file mode 100644 index 0000000..a288bf6 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeContinuousMapperExample.scala @@ -0,0 +1,144 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.IgniteException +import org.apache.ignite.cluster.ClusterNode +import org.apache.ignite.compute._ +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.lang.{GridFunc => F} +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.resources.TaskContinuousMapperResource +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +import java.util.{Collections, Date, List => JavaList, Map => JavaMap, Queue => JavaQueue} + +/** + * Demonstrates usage of continuous mapper. With continuous mapper + * it is possible to continue mapping jobs asynchronously even after + * initial [[ComputeTask#map(List, Object)]] method completes. + * <p> + * String "Hello Continuous Mapper" is passed as an argument for execution + * of [[ContinuousMapperTask]]. As an outcome, participating + * nodes will print out a single word from the passed in string and return + * number of characters in that word. However, to demonstrate continuous + * mapping, next word will be mapped to a node only after the result from + * previous word has been received. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeContinuousMapperExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println(">>> Compute continuous mapper example started.") + + val phraseLen = ignite$.compute execute(classOf[ContinuousMapperTask], "Hello Continuous Mapper") + + println() + println(">>> Total number of characters in the phrase is '" + phraseLen + "'.") + } +} + +/** + * This task demonstrates how continuous mapper is used. The passed in phrase + * is split into multiple words and next word is sent out for processing only + * when the result for the previous word was received. + * <p> + * Note that annotation [[ComputeTaskNoResultCache]] is optional and tells Ignite + * not to accumulate results from individual jobs. In this example we increment + * total character count directly in [[ComputeTaskNoResultCache#result(ComputeJobResult, List)]] method, + * and therefore don't need to accumulate them be be processed at reduction step. + */ +@ComputeTaskNoResultCache private class ContinuousMapperTask extends ComputeTaskAdapter[String, Integer] { + /** This field will be injected with task continuous mapper. */ + @TaskContinuousMapperResource private var mapper: ComputeTaskContinuousMapper = null + + /** Word queue. */ + private final val words: JavaQueue[String] = new ConcurrentLinkedQueue[String] + + /** Total character count. */ + private final val totalChrCnt = new AtomicInteger(0) + + @impl def map(subgrid: JavaList[ClusterNode], phrase: String): JavaMap[_ <: ComputeJob, ClusterNode] = { + if (F.isEmpty(phrase)) + throw new IgniteException("Phrase is empty.") + + Collections.addAll[String](words, phrase.split(" "):_*) + + sendWord() + + null + } + + override def result(res: ComputeJobResult, rcvd: JavaList[ComputeJobResult]): ComputeJobResultPolicy = { + if (res.getException != null) + super.result(res, rcvd) + else { + totalChrCnt.addAndGet(res.getData[Integer]) + + sendWord() + + ComputeJobResultPolicy.WAIT + } + } + + @impl def reduce(results: JavaList[ComputeJobResult]): Integer = { + totalChrCnt.get + } + + /** + * Sends next queued word to the next node implicitly selected by load balancer. + */ + private def sendWord() { + val word = words.poll + + if (word != null) { + mapper.send(new ComputeJobAdapter(word) { + @impl def execute() = { + val word: String = argument(0) + + println() + println(">>> Printing '" + word + "' from ignite job at time: " + new Date()) + + val cnt = word.length + + try { + Thread.sleep(1000) + } + catch { + case ignored: InterruptedException => // No-op. + } + + Integer.valueOf(cnt) + } + }) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeFibonacciContinuationExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeFibonacciContinuationExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeFibonacciContinuationExample.scala new file mode 100644 index 0000000..9e12131 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeFibonacciContinuationExample.scala @@ -0,0 +1,159 @@ +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.compute.ComputeJobContext +import org.apache.ignite.lang.{IgniteFuture, IgniteClosure} +import org.apache.ignite.resources.JobContextResource +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import org.jetbrains.annotations.Nullable + +import java.math.BigInteger +import java.util.UUID + +/** + * This example recursively calculates `Fibonacci` numbers on the ignite cluster. This is + * a powerful design pattern which allows for creation of fully distributively recursive + * (a.k.a. nested) tasks or closures with continuations. This example also shows + * usage of `continuations`, which allows us to wait for results from remote nodes + * without blocking threads. + * <p/> + * Note that because this example utilizes local node storage via `NodeLocal`, + * it gets faster if you execute it multiple times, as the more you execute it, + * the more values it will be cached on remote nodes. + * <p/> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p/> + * Alternatively you can run `ExampleNodeStartup` in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeFibonacciContinuationExample { + def main(args: Array[String]) { + scalar("examples/config/example-ignite.xml") { + // Calculate fibonacci for N. + val N = 100L + + val cluster = cluster$ + + val thisNode = cluster.localNode + + val start = System.currentTimeMillis + + // Group that excludes this node if others exists. + val prj = if (cluster.nodes().size() > 1) cluster.forOthers(thisNode) else cluster.forNode(thisNode) + + val fib = ignite$.compute(prj).apply(new FibonacciClosure(thisNode.id()), N) + + val duration = System.currentTimeMillis - start + + println(">>>") + println(">>> Finished executing Fibonacci for '" + N + "' in " + duration + " ms.") + println(">>> Fibonacci sequence for input number '" + N + "' is '" + fib + "'.") + println(">>> You should see prints out every recursive Fibonacci execution on cluster nodes.") + println(">>> Check remote nodes for output.") + println(">>>") + } + } +} + +/** + * Closure to execute. + * + * @param excludeNodeId Node to exclude from execution if there are more then 1 node in cluster. + */ +class FibonacciClosure (private[this] val excludeNodeId: UUID) extends IgniteClosure[Long, BigInteger] { + // These fields must be *transient* so they do not get + // serialized and sent to remote nodes. + // However, these fields will be preserved locally while + // this closure is being "held", i.e. while it is suspended + // and is waiting to be continued. + @transient private var fut1, fut2: IgniteFuture[BigInteger] = null + + // Auto-inject job context. + @JobContextResource + private val jobCtx: ComputeJobContext = null + + @Nullable override def apply(num: Long): BigInteger = { + if (fut1 == null || fut2 == null) { + println(">>> Starting fibonacci execution for number: " + num) + + val cluster = cluster$ + + // Make sure n is not negative. + val n = math.abs(num) + + if (n <= 2) + return if (n == 0) + BigInteger.ZERO + else + BigInteger.ONE + + // Get properly typed node-local storage. + val store = cluster.nodeLocalMap[Long, IgniteFuture[BigInteger]]() + + // Check if value is cached in node-local store first. + fut1 = store.get(n - 1) + fut2 = store.get(n - 2) + + val excludeNode = cluster.node(excludeNodeId) + + // Group that excludes node with id passed in constructor if others exists. + val prj = if (cluster.nodes().size() > 1) cluster.forOthers(excludeNode) else cluster.forNode(excludeNode) + + val comp = ignite$.compute(prj).withAsync() + + // If future is not cached in node-local store, cache it. + // Note recursive execution! + if (fut1 == null) { + comp.apply(new FibonacciClosure(excludeNodeId), n - 1) + + val futVal = comp.future[BigInteger]() + + fut1 = store.putIfAbsent(n - 1, futVal) + + if (fut1 == null) + fut1 = futVal + } + + // If future is not cached in node-local store, cache it. + if (fut2 == null) { + comp.apply(new FibonacciClosure(excludeNodeId), n - 2) + + val futVal = comp.future[BigInteger]() + + fut2 = store.putIfAbsent(n - 2, futVal) + + if (fut2 == null) + fut2 = futVal + } + + // If futures are not done, then wait asynchronously for the result + if (!fut1.isDone || !fut2.isDone) { + val lsnr = (fut: IgniteFuture[BigInteger]) => { + // This method will be called twice, once for each future. + // On the second call - we have to have both futures to be done + // - therefore we can call the continuation. + if (fut1.isDone && fut2.isDone) + jobCtx.callcc() // Resume job execution. + } + + // Hold (suspend) job execution. + // It will be resumed in listener above via 'callcc()' call + // once both futures are done. + jobCtx.holdcc() + + // Attach the same listener to both futures. + fut1.listen(lsnr) + fut2.listen(lsnr) + + return null + } + } + + assert(fut1.isDone && fut2.isDone) + + // Return cached results. + fut1.get.add(fut2.get) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeReduceExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeReduceExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeReduceExample.scala new file mode 100644 index 0000000..1d91e68 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeReduceExample.scala @@ -0,0 +1,83 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.lang.IgniteReducer +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +/** + * Demonstrates a simple use of Ignite with reduce closure. + * <p> + * Phrase is split into words and distributed across nodes where length of each word is + * calculated. Then total phrase length is calculated using reducer. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start Ignite node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeReduceExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println("Compute reducer example started.") + + val sum = ignite$.compute.apply(toClosure((word: String) => { + println() + println(">>> Printing '" + word + "' on this node from ignite job.") + + // Return number of letters in the word. + word.length + }), + // Job parameters. Ignite will create as many jobs as there are parameters. + Arrays.asList("Count characters using reducer".split(" "):_*), + + // Reducer to process results as they come. + new IgniteReducer[Int, Int] { + private val sum = new AtomicInteger + + // Callback for every job result. + @impl def collect(len: Int): Boolean = { + sum.addAndGet(len) + + // Return true to continue waiting until all results are received. + true + } + + // Reduce all results into one. + @impl def reduce: Int = { + sum.get + } + }) + + println() + println(">>> Total number of characters in the phrase is '" + sum + "'.") + println(">>> Check all nodes for output (this node is also part of the cluster).") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeRunnableExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeRunnableExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeRunnableExample.scala new file mode 100644 index 0000000..453ad08 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeRunnableExample.scala @@ -0,0 +1,58 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.lang.IgniteRunnable +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +/** + * Demonstrates a simple use of [[IgniteRunnable]]. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeRunnableExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println("Compute runnable example started.") + + val compute = ignite$.compute + + // Iterate through all words in the sentence and create runnable jobs. + "Print words using runnable".split(" ").foreach(word => { + compute.run(() => { + println() + println(">>> Printing '" + word + "' on this node from ignite job.") + }) + }) + + println() + println(">>> Finished printing words using runnable execution.") + println(">>> Check all nodes for output (this node is also part of the cluster).") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeTaskMapExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeTaskMapExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeTaskMapExample.scala new file mode 100644 index 0000000..8de1324 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeTaskMapExample.scala @@ -0,0 +1,108 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.cluster.ClusterNode +import org.apache.ignite.compute.{ComputeJob, ComputeJobAdapter, ComputeJobResult, ComputeTaskAdapter} +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import com.sun.istack.internal.Nullable + +import java.util.{HashMap => JavaHashMap, List => JavaList, Map => JavaMap} + +import scala.collection.JavaConversions._ + +/** + * Demonstrates a simple use of Ignite with + * [[ComputeTaskAdapter]]. + * <p> + * Phrase passed as task argument is split into words on map stage and distributed among cluster nodes. + * Each node computes word length and returns result to master node where total phrase length is + * calculated on reduce stage. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeTaskMapExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println("Compute task map example started.") + + // Execute task on the cluster and wait for its completion. + val cnt = ignite$.compute.execute(classOf[CharacterCountTask], "Hello Ignite Enabled World!") + + println() + println(">>> Total number of characters in the phrase is '" + cnt + "'.") + println(">>> Check all nodes for output (this node is also part of the cluster).") + } +} + +/** + * Task to count non-white-space characters in a phrase. + */ +private class CharacterCountTask extends ComputeTaskAdapter[String, Integer] { + /** + * Splits the received string to words, creates a child job for each word, and sends + * these jobs to other nodes for processing. Each such job simply prints out the received word. + * + * @param nodes Nodes available for this task execution. + * @param arg String to split into words for processing. + * @return Map of jobs to nodes. + */ + @impl def map(nodes: JavaList[ClusterNode], arg: String): JavaMap[_ <: ComputeJob, ClusterNode] = { + val words = arg.split(" ") + + val map = new JavaHashMap[ComputeJob, ClusterNode] + + var it = nodes.iterator + + for (word <- words) { + if (!it.hasNext) + it = nodes.iterator + + val node = it.next() + + map.put(new ComputeJobAdapter { + @Nullable @impl def execute: Object = { + println() + println(">>> Printing '" + word + "' on this node from ignite job.") + + Integer.valueOf(word.length) + } + }, node) + } + + map + } + + @impl def reduce(results: JavaList[ComputeJobResult]): Integer = { + results.foldLeft(0)((_, res) => res.getData[Integer]) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeTaskSplitExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeTaskSplitExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeTaskSplitExample.scala new file mode 100644 index 0000000..fcb7418 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/ScalarComputeTaskSplitExample.scala @@ -0,0 +1,97 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid + +import org.apache.ignite.compute.{ComputeJob, ComputeJobAdapter, ComputeJobResult, ComputeTaskSplitAdapter} +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import org.jetbrains.annotations.Nullable + +import java.util.{Collection => JavaCollection, LinkedList => JavaLinkedList, List => JavaList} + +import scala.collection.JavaConversions._ + +/** + * Demonstrates a simple use of Ignite with [[ComputeTaskSplitAdapter]]. + * <p> + * Phrase passed as task argument is split into jobs each taking one word. Then jobs are distributed among + * cluster nodes. Each node computes word length and returns result to master node where total phrase length + * is calculated on reduce stage. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeTaskSplitExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println("Compute task split example started.") + + // Execute task on the cluster and wait for its completion. + val cnt = ignite$.compute.execute(classOf[CharacterCountTask], "Hello Ignite Enabled World!") + + println() + println(">>> Total number of characters in the phrase is '" + cnt + "'.") + println(">>> Check all nodes for output (this node is also part of the cluster).") + } + + /** + * Task to count non-white-space characters in a phrase. + */ + private class CharacterCountTask extends ComputeTaskSplitAdapter[String, Integer] { + /** + * Splits the received string to words, creates a child job for each word, and sends + * these jobs to other nodes for processing. Each such job simply prints out the received word. + * + * @param clusterSize Number of available cluster nodes. Note that returned number of + * jobs can be less, equal or greater than this cluster size. + * @param arg Task execution argument. Can be { @code null}. + * @return The list of child jobs. + */ + protected def split(clusterSize: Int, arg: String): JavaCollection[_ <: ComputeJob] = { + val jobs: JavaCollection[ComputeJob] = new JavaLinkedList[ComputeJob] + + for (word <- arg.split(" ")) { + jobs.add(new ComputeJobAdapter { + @Nullable @impl def execute: Object = { + println() + println(">>> Printing '" + word + "' on this node from ignite job.") + + word.length.asInstanceOf[Object] + } + }) + } + + jobs + } + + @impl def reduce(results: JavaList[ComputeJobResult]): Integer = { + results.foldLeft(0)(_ + _.getData[Integer]) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/cluster/ScalarComputeClusterGroupExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/cluster/ScalarComputeClusterGroupExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/cluster/ScalarComputeClusterGroupExample.scala new file mode 100644 index 0000000..afbed23 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/cluster/ScalarComputeClusterGroupExample.scala @@ -0,0 +1,88 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid.cluster + +import org.apache.ignite.cluster.{ClusterGroup, ClusterNode} +import org.apache.ignite.examples.{ExampleNodeStartup, ExamplesUtils} +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.lang.IgnitePredicate +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{Ignite, IgniteException} + +/** + * Demonstrates new functional APIs. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeClusterGroupExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + val cluster = cluster$ + + if (ExamplesUtils.checkMinTopologySize(cluster, 2)) { + println() + println("Compute example started.") + + + // Say hello to all nodes in the cluster, including local node. + sayHello(ignite$, cluster) + + // Say hello to all remote nodes. + sayHello(ignite$, cluster.forRemotes) + + // Pick random node out of remote nodes. + val randomNode = cluster.forRemotes.forRandom + + // Say hello to a random node. + sayHello(ignite$, randomNode) + + // Say hello to all nodes residing on the same host with random node. + sayHello(ignite$, cluster.forHost(randomNode.node)) + + // Say hello to all nodes that have current CPU load less than 50%. + sayHello(ignite$, cluster.forPredicate(new IgnitePredicate[ClusterNode] { + @impl def apply(n: ClusterNode): Boolean = { + n.metrics.getCurrentCpuLoad < 0.5 + } + })) + } + } + + /** + * Print 'Hello' message on remote nodes. + * + * @param ignite Ignite. + * @param grp Cluster group. + * @throws IgniteException If failed. + */ + @throws(classOf[IgniteException]) + private def sayHello(ignite: Ignite, grp: ClusterGroup) { + ignite.compute(grp).broadcast(toRunnable(() => { + println(">>> Hello Node: " + grp.ignite.cluster.localNode.id) + })) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/failover/ScalarComputeFailoverExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/failover/ScalarComputeFailoverExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/failover/ScalarComputeFailoverExample.scala new file mode 100644 index 0000000..987511e --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/failover/ScalarComputeFailoverExample.scala @@ -0,0 +1,124 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.computegrid.failover + +import org.apache.ignite.IgniteLogger +import org.apache.ignite.compute.{ComputeJobFailoverException, ComputeTaskSession, ComputeTaskSessionFullSupport} +import org.apache.ignite.examples.ExamplesUtils +import org.apache.ignite.examples.computegrid.failover.ComputeFailoverNodeStartup +import org.apache.ignite.lang.{IgniteBiTuple, IgniteClosure} +import org.apache.ignite.resources.{LoggerResource, TaskSessionResource} +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +/** + * Demonstrates the usage of checkpoints in Ignite. + * <p> + * The example tries to compute phrase length. In order to mitigate possible node failures, intermediate + * result is saved as as checkpoint after each job step. + * <p> + * Remote nodes must be started using [[ComputeFailoverNodeStartup]]. + */ +object ScalarComputeFailoverExample extends App { + scalar(ComputeFailoverNodeStartup.configuration()) { + if (ExamplesUtils.checkMinTopologySize(cluster$, 2)) { + println() + println("Compute failover example started.") + + // Number of letters. + val charCnt: Int = ignite$.compute.apply(new CheckPointJob, "Stage1 Stage2") + + println() + println(">>> Finished executing fail-over example with checkpoints.") + println(">>> Total number of characters in the phrase is '" + charCnt + "'.") + println(">>> You should see exception stack trace from failed job on some node.") + println(">>> Failed job will be failed over to another node.") + } + } +} + +@ComputeTaskSessionFullSupport private final class CheckPointJob extends IgniteClosure[String, Integer] { + /** Injected distributed task session. */ + @TaskSessionResource private var jobSes: ComputeTaskSession = null + + /** Injected ignite logger. */ + @LoggerResource private var log: IgniteLogger = null + + /** */ + private var state: IgniteBiTuple[Integer, Integer] = null + + /** */ + private var phrase: String = null + + /** + * The job will check the checkpoint with key `'fail'` and if + * it's `true` it will throw exception to simulate a failure. + * Otherwise, it will execute enabled method. + */ + def apply(phrase: String): Integer = { + println() + println(">>> Executing fail-over example job.") + + this.phrase = phrase + + val words = Seq(phrase.split(" "):_*) + + val cpKey = checkpointKey + + val state: IgniteBiTuple[Integer, Integer] = jobSes.loadCheckpoint(cpKey) + + var idx = 0 + + var sum = 0 + + if (state != null) { + this.state = state + + idx = state.get1 + + sum = state.get2 + } + + for (i <- idx until words.size) { + sum += words(i).length + + this.state = new IgniteBiTuple[Integer, Integer](i + 1, sum) + + jobSes.saveCheckpoint(cpKey, this.state) + + if (i == 0) { + println() + println(">>> Job will be failed over to another node.") + + throw new ComputeJobFailoverException("Expected example job exception.") + } + } + sum + } + + /** + * Make reasonably unique checkpoint key. + * + * @return Checkpoint key. + */ + private def checkpointKey: String = { + getClass.getName + '-' + phrase + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/montecarlo/ScalarComputeCreditRiskExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/montecarlo/ScalarComputeCreditRiskExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/montecarlo/ScalarComputeCreditRiskExample.scala new file mode 100644 index 0000000..5dafe9f --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/computegrid/montecarlo/ScalarComputeCreditRiskExample.scala @@ -0,0 +1,231 @@ +package org.apache.ignite.scalar.examples.computegrid.montecarlo + +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import scala.util.Random +import scala.util.control.Breaks._ + +/** + * Scalar-based Monte-Carlo example. + * <p/> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p/> + * Alternatively you can run `ExampleNodeStartup` in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeCreditRiskExample { + def main(args: Array[String]) { + scalar("examples/config/example-ignite.xml") { + // Create portfolio. + var portfolio = Seq.empty[Credit] + + val rnd = new Random + + // Generate some test portfolio items. + (0 until 5000).foreach(i => + portfolio +:= Credit( + 50000 * rnd.nextDouble, + rnd.nextInt(1000), + rnd.nextDouble / 10, + rnd.nextDouble / 20 + 0.02 + ) + ) + + // Forecast horizon in days. + val horizon = 365 + + // Number of Monte-Carlo iterations. + val iter = 10000 + + // Percentile. + val percentile = 0.95 + + // Mark the stopwatch. + val start = System.currentTimeMillis + + // Calculate credit risk and print it out. + // As you can see the ignite cluster enabling is completely hidden from the caller + // and it is fully transparent to him. In fact, the caller is never directly + // aware if method was executed just locally or on the 100s of cluster nodes. + // Credit risk crdRisk is the minimal amount that creditor has to have + // available to cover possible defaults. + val crdRisk = ignite$ @< (closures(cluster$.nodes().size(), portfolio, horizon, iter, percentile), + (s: Seq[Double]) => s.sum / s.size, null) + + println("Credit risk [crdRisk=" + crdRisk + ", duration=" + + (System.currentTimeMillis - start) + "ms]") + } + } + + /** + * Creates closures for calculating credit risks. + * + * @param clusterSize Size of the cluster. + * @param portfolio Portfolio. + * @param horizon Forecast horizon in days. + * @param iter Number of Monte-Carlo iterations. + * @param percentile Percentile. + * @return Collection of closures. + */ + private def closures(clusterSize: Int, portfolio: Seq[Credit], horizon: Int, iter: Int, + percentile: Double): Seq[() => Double] = { + val iterPerNode = math.round(iter / clusterSize.asInstanceOf[Float]) + val lastNodeIter = iter - (clusterSize - 1) * iterPerNode + + var cls = Seq.empty[() => Double] + + (0 until clusterSize).foreach(i => { + val nodeIter = if (i == clusterSize - 1) lastNodeIter else iterPerNode + + cls +:= (() => new CreditRiskManager().calculateCreditRiskMonteCarlo( + portfolio, horizon, nodeIter, percentile)) + }) + + cls + } +} + +/** + * This class provides a simple model for a credit contract (or a loan). It is basically + * defines as remaining crediting amount to date, credit remaining term, APR and annual + * probability on default. Although this model is simplified for the purpose + * of this example, it is close enough to emulate the real-life credit + * risk assessment application. + * + * @param remAmnt Remaining crediting amount. + * @param remTerm Remaining crediting remTerm. + * @param apr Annual percentage rate (APR). + * @param edf Expected annual probability of default (EaDF). + */ +private case class Credit(remAmnt: Double, remTerm: Int, apr: Double, edf: Double) { + /** + * Gets either credit probability of default for the given period of time + * if remaining term is less than crediting time or probability of default + * for whole remained crediting time. + * + * @param term Default term. + * @return Credit probability of default in relative percents + * (percentage / 100). + */ + def getDefaultProbability(term: Int): Double = { + 1 - math.exp(math.log(1 - edf) * math.min(remTerm, term) / 365.0) + } +} + +/** + * This class abstracts out the calculation of risk for a credit portfolio. + */ +private class CreditRiskManager { + /** + * Default randomizer with normal distribution. + * Note that since every JVM on the ignite cluster will have its own random + * generator (independently initialized) the Monte-Carlo simulation + * will be slightly skewed when performed on the ignite cluster due to skewed + * normal distribution of the sub-jobs comparing to execution on the + * local node only with single random generator. Real-life applications + * may want to provide its own implementation of distributed random + * generator. + */ + private val rndGen = new Random + + /** + * Calculates credit risk for a given credit portfolio. This calculation uses + * Monte-Carlo Simulation to produce risk value. + * + * @param portfolio Credit portfolio. + * @param horizon Forecast horizon (in days). + * @param num Number of Monte-Carlo iterations. + * @param percentile Cutoff level. + * @return Credit risk value, i.e. the minimal amount that creditor has to + * have available to cover possible defaults. + */ + def calculateCreditRiskMonteCarlo(portfolio: Seq[Credit], horizon: Int, num: Int, percentile: Double): Double = { + println(">>> Calculating credit risk for portfolio [size=" + portfolio.length + ", horizon=" + + horizon + ", percentile=" + percentile + ", iterations=" + num + "] <<<") + + val start = System.currentTimeMillis + + val losses = calculateLosses(portfolio, horizon, num).sorted + val lossProbs = new Array[Double](losses.length) + + (0 until losses.length).foreach(i => { + if (i == 0) + lossProbs(i) = getLossProbability(losses, 0) + else if (losses(i) != losses(i - 1)) + lossProbs(i) = getLossProbability(losses, i) + lossProbs(i - 1) + else + lossProbs(i) = lossProbs(i - 1) + }) + + var crdRisk = 0.0 + + breakable { + (0 until lossProbs.length).foreach(i => { + if (lossProbs(i) > percentile) { + crdRisk = losses(i - 1) + + break() + } + }) + } + + println(">>> Finished calculating portfolio risk [risk=" + crdRisk + + ", time=" + (System.currentTimeMillis - start) + "ms]") + + crdRisk + } + + /** + * Calculates losses for the given credit portfolio using Monte-Carlo Simulation. + * Simulates probability of default only. + * + * @param portfolio Credit portfolio. + * @param horizon Forecast horizon. + * @param num Number of Monte-Carlo iterations. + * @return Losses array simulated by Monte Carlo method. + */ + private def calculateLosses(portfolio: Seq[Credit], horizon: Int, num: Int): Array[Double] = { + val losses = new Array[Double](num) + + // Count losses using Monte-Carlo method. We generate random probability of default, + // if it exceeds certain credit default value we count losses - otherwise count income. + (0 until num).foreach(i => { + portfolio.foreach(crd => { + val remDays = math.min(crd.remTerm, horizon) + + if (rndGen.nextDouble >= 1 - crd.getDefaultProbability(remDays)) + // (1 + 'r' * min(H, W) / 365) * S. + // Where W is a horizon, H is a remaining crediting term, 'r' is an annual credit rate, + // S is a remaining credit amount. + losses(i) += (1 + crd.apr * math.min(horizon, crd.remTerm) / 365) * crd.remAmnt + else + // - 'r' * min(H,W) / 365 * S + // Where W is a horizon, H is a remaining crediting term, 'r' is a annual credit rate, + // S is a remaining credit amount. + losses(i) -= crd.apr * math.min(horizon, crd.remTerm) / 365 * crd.remAmnt + }) + }) + + losses + } + + /** + * Calculates probability of certain loss in array of losses. + * + * @param losses Array of losses. + * @param i Index of certain loss in array. + * @return Probability of loss with given index. + */ + private def getLossProbability(losses: Array[Double], i: Int): Double = { + var count = 0.0 + + losses.foreach(tmp => { + if (tmp == losses(i)) + count += 1 + }) + + count / losses.length + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/ScalarCacheAffinityExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/ScalarCacheAffinityExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/ScalarCacheAffinityExample.scala new file mode 100644 index 0000000..65bea2b --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/ScalarCacheAffinityExample.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.scalar.examples.datagrid + +import org.apache.ignite.IgniteCache +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import scala.collection.JavaConversions._ + +/** + * This example demonstrates the simplest code that populates the distributed cache + * and co-locates simple closure execution with each key. The goal of this particular + * example is to provide the simplest code example of this logic. + * <p/> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p/> + * Alternatively you can run `ExampleNodeStartup` in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarCacheAffinityExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Name of cache. */ + private val NAME = ScalarCacheAffinityExample.getClass.getSimpleName + + /** Number of keys. */ + private val KEY_CNT = 20 + + /** Type alias. */ + type Cache = IgniteCache[Int, String] + + /* + * Note that in case of `LOCAL` configuration, + * since there is no distribution, values may come back as `nulls`. + */ + scalar(CONFIG) { + val cache = createCache$[Int, String](NAME) + + try { + populate (cache) + + visitUsingAffinityRun(cache) + + visitUsingMapKeysToNodes(cache) + } + finally { + cache.close() + } + } + + /** + * Visits every in-memory data ignite entry on the remote node it resides by co-locating visiting + * closure with the cache key. + * + * @param c Cache to use. + */ + private def visitUsingAffinityRun(c: IgniteCache[Int, String]) { + (0 until KEY_CNT).foreach (i => + ignite$.compute ().affinityRun (NAME, i, + () => println ("Co-located using affinityRun [key= " + i + ", value=" + c.localPeek (i) + ']') ) + ) + } + + /** + * Collocates jobs with keys they need to work. + * + * @param c Cache to use. + */ + private def visitUsingMapKeysToNodes(c: IgniteCache[Int, String]) { + val keys = 0 until KEY_CNT + + val cluster = cluster$ + + // Map all keys to nodes. + val mappings = cluster.mapKeysToNodes(NAME, keys) + + mappings.foreach(mapping => { + val node = mapping._1 + val mappedKeys = mapping._2 + + if (node != null) { + cluster.forNode(node) *< (() => { + // Check cache without loading the value. + mappedKeys.foreach(key => println("Co-located using mapKeysToNodes [key= " + key + + ", value=" + c.localPeek(key) + ']')) + }, null) + } + }) + } + + /** + * Populates given cache. + * + * @param c Cache to populate. + */ + private def populate(c: Cache) { + (0 until KEY_CNT).foreach(i => c += (i -> i.toString)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/ScalarCacheApiExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/ScalarCacheApiExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/ScalarCacheApiExample.scala new file mode 100644 index 0000000..a589e40 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/ScalarCacheApiExample.scala @@ -0,0 +1,101 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datagrid + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{IgniteCache, IgniteException} + +import javax.cache.processor.{EntryProcessor, MutableEntry} +import java.util.concurrent.ConcurrentMap + +/** + * This example demonstrates some of the cache rich API capabilities. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarCacheApiExample extends App{ + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Cache name. */ + private val CACHE_NAME = ScalarCacheApiExample.getClass.getSimpleName + + scalar(CONFIG) { + println() + println(">>> Cache API example started.") + + val cache = createCache$[Integer, String](CACHE_NAME) + + try { + atomicMapOperations(cache) + } + finally { + if (cache != null) cache.close() + } + } + + /** + * Demonstrates cache operations similar to [[ConcurrentMap]] API. Note that + * cache API is a lot richer than the JDK [[ConcurrentMap]]. + * + * @throws IgniteException If failed. + */ + @throws(classOf[IgniteException]) + private def atomicMapOperations(cache: IgniteCache[Integer, String]) { + println() + println(">>> Cache atomic map operation examples.") + + val v = cache.getAndPut(1, "1") + + assert(v == null) + + cache.put(2, "2") + + var b1 = cache.putIfAbsent(4, "4") + var b2 = cache.putIfAbsent(4, "44") + + assert(b1 && !b2) + + cache.put(6, "6") + cache.invoke(6, new EntryProcessor[Integer, String, AnyRef] { + @impl def process(entry: MutableEntry[Integer, String], args: AnyRef*): AnyRef = { + val v = entry.getValue + + entry.setValue(v + "6") + + null + } + }) + + cache.put(7, "7") + + b1 = cache.replace(7, "7", "77") + b2 = cache.replace(7, "7", "777") + + assert(b1 & !b2) + } +}