Repository: incubator-ignite Updated Branches: refs/heads/ignite-596 [created] 41a5bf67b
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingExample.scala new file mode 100644 index 0000000..51788d1 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingExample.scala @@ -0,0 +1,166 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.messaging + +import org.apache.ignite.examples.{ExampleNodeStartup, ExamplesUtils} +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.lang.IgniteBiPredicate +import org.apache.ignite.resources.IgniteInstanceResource +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{Ignite, IgniteException, IgniteMessaging} + +import java.util.UUID +import java.util.concurrent.CountDownLatch + +/** + * Example that demonstrates how to exchange messages between nodes. Use such + * functionality for cases when you need to communicate to other nodes outside + * of ignite task. + * <p> + * To run this example you must have at least one remote node started. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat}` examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarMessagingExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Number of messages. */ + private val MESSAGES_NUM = 10 + + /** Message topics. */ + private object TOPIC extends Enumeration { + type TOPIC = Value + val ORDERED, UNORDERED = Value + } + + scalar(CONFIG) { + val cluster = cluster$ + + if (ExamplesUtils.checkMinTopologySize(cluster, 2)) { + println() + println(">>> Messaging example started.") + + // Group for remote nodes. + val rmts = cluster.forRemotes + + // Listen for messages from remote nodes to make sure that they received all the messages. + val msgCnt = rmts.nodes.size * MESSAGES_NUM + + val orderedLatch = new CountDownLatch(msgCnt) + val unorderedLatch = new CountDownLatch(msgCnt) + + localListen(ignite$.message(cluster.forLocal), orderedLatch, unorderedLatch) + + // Register listeners on all cluster nodes. + startListening(ignite$.message(rmts)) + + // Send unordered messages to all remote nodes. + for (i <- 0 until MESSAGES_NUM) + ignite$.message(rmts).send(TOPIC.UNORDERED, Integer.toString(i)) + + println(">>> Finished sending unordered messages.") + + // Send ordered messages to all remote nodes. + for (i <- 0 until MESSAGES_NUM) + ignite$.message(rmts).sendOrdered(TOPIC.ORDERED, Integer.toString(i), 0) + + println(">>> Finished sending ordered messages.") + println(">>> Check output on all nodes for message printouts.") + println(">>> Will wait for messages acknowledgements from all remote nodes.") + + orderedLatch.await() + unorderedLatch.await() + + println(">>> Messaging example finished.") + } + } + + /** + * Start listening to messages on remote cluster nodes. + * + * @param msg Ignite messaging. + */ + private def startListening(msg: IgniteMessaging) { + msg.remoteListen(TOPIC.ORDERED, new IgniteBiPredicate[UUID, String] { + @IgniteInstanceResource private var ignite: Ignite = null + + @impl def apply(nodeId: UUID, msg: String): Boolean = { + println("Received ordered message [msg=" + msg + ", fromNodeId=" + nodeId + ']') + + try { + ignite.message(ignite.cluster.forNodeId(nodeId)).send(TOPIC.ORDERED, msg) + } + catch { + case e: IgniteException => + e.printStackTrace() + } + + true + } + }) + msg.remoteListen(TOPIC.UNORDERED, new IgniteBiPredicate[UUID, String] { + @IgniteInstanceResource private var ignite: Ignite = null + + @impl def apply(nodeId: UUID, msg: String): Boolean = { + println("Received unordered message [msg=" + msg + ", fromNodeId=" + nodeId + ']') + + try { + ignite.message(ignite.cluster.forNodeId(nodeId)).send(TOPIC.UNORDERED, msg) + } + catch { + case e: IgniteException => + e.printStackTrace() + } + + true + } + }) + } + + /** + * Listen for messages from remote nodes. + * + * @param msg Ignite messaging. + * @param orderedLatch Latch for ordered messages acks. + * @param unorderedLatch Latch for unordered messages acks. + */ + private def localListen(msg: IgniteMessaging, orderedLatch: CountDownLatch, unorderedLatch: CountDownLatch) { + msg.localListen(TOPIC.ORDERED, new IgniteBiPredicate[UUID, String] { + @impl def apply(nodeId: UUID, msg: String): Boolean = { + orderedLatch.countDown() + + orderedLatch.getCount > 0 + } + }) + msg.localListen(TOPIC.UNORDERED, new IgniteBiPredicate[UUID, String] { + @impl def apply(nodeId: UUID, msg: String): Boolean = { + unorderedLatch.countDown() + + unorderedLatch.getCount > 0 + } + }) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingPingPongExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingPingPongExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingPingPongExample.scala new file mode 100644 index 0000000..cd37590 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingPingPongExample.scala @@ -0,0 +1,179 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.messaging + +import org.apache.ignite.IgniteException +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.lang.IgniteBiPredicate +import org.apache.ignite.messaging.MessagingListenActor +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.UUID +import java.util.concurrent.CountDownLatch + +/** + * Demonstrates simple protocol-based exchange in playing a ping-pong between + * two nodes. It is analogous to `MessagingPingPongExample` on Java side. + * <p/> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p/> + * Alternatively you can run `ExampleNodeStartup` in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarMessagingPingPongExample extends App { + scalar("examples/config/example-ignite.xml") { + pingPong() + } + + /** + * Implements Ping Pong example between local and remote node. + */ + def pingPong() { + val g = ignite$ + + val cluster = cluster$ + + if (cluster.nodes().size < 2) { + println(">>>") + println(">>> I need a partner to play a ping pong!") + println(">>>") + } + else { + // Pick first remote node as a partner. + val nodeB = cluster.forNode(g.remoteNodes$().head) + + // Set up remote player: configure remote node 'rmt' to listen + // for messages from local node 'loc'. + g.message(nodeB).remoteListen(null, new IgniteBiPredicate[UUID, String] { + @impl def apply(nodeId: UUID, rcvMsg: String): Boolean = { + println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']') + + rcvMsg match { + case "PING" => + ignite$.message(cluster$.forNodeId(nodeId)).send(null, "PONG") + + true + case _ => + false + } + } + }) + + val latch = new CountDownLatch(10) + + // Set up local player: configure local node 'loc' + // to listen for messages from remote node 'rmt'. + ignite$.message().localListen(null, new IgniteBiPredicate[UUID, String] { + @impl def apply(nodeId: UUID, rcvMsg: String): Boolean = { + println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']') + + if (latch.getCount == 1) { + ignite$.message(cluster$.forNodeId(nodeId)).send(null, "STOP") + + latch.countDown() + + return false + } + else if ("PONG" == rcvMsg) + ignite$.message(cluster.forNodeId(nodeId)).send(null, "PING") + else + throw new IgniteException("Received unexpected message: " + rcvMsg) + + latch.countDown() + + true + } + }) + + // Serve! + nodeB.send$("PING", null) + + // Wait til the match is over. + latch.await() + } + } + + /** + * Implements Ping Pong example between two remote nodes. + */ + def pingPong2() { + val g = ignite$ + + val cluster = cluster$ + + if (cluster.forRemotes().nodes().size() < 2) { + println(">>>") + println(">>> I need at least two remote nodes!") + println(">>>") + } + else { + // Pick two remote nodes. + val n1 = cluster.forRemotes().head + val n2 = cluster.forRemotes().tail.head + + val n1p = cluster.forNode(n1) + val n2p = cluster.forNode(n2) + + // Configure remote node 'n1' to receive messages from 'n2'. + g.message(n1p).remoteListen(null, new MessagingListenActor[String] { + def receive(nid: UUID, msg: String) { + println(msg) + + msg match { + case "PING" => respond("PONG") + case "STOP" => stop() + } + } + }) + + // Configure remote node 'n2' to receive messages from 'n1'. + g.message(n2p).remoteListen(null, new MessagingListenActor[String] { + // Get local count down latch. + private lazy val latch: CountDownLatch = cluster.nodeLocalMap().get("latch") + + def receive(nid: UUID, msg: String) { + println(msg) + + latch.getCount match { + case 1 => stop("STOP") + case _ => respond("PING") + } + + latch.countDown() + } + }) + + // 1. Sets latch into node local storage so that local actor could use it. + // 2. Sends first 'PING' to 'n1'. + // 3. Waits until all messages are exchanged between two remote nodes. + n2p.run$(() => { + val latch = new CountDownLatch(10) + + cluster.nodeLocalMap[String, CountDownLatch].put("latch", latch) + + n1p.send$("PING", null) + + latch.await() + }, null) + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingPingPongListenActorExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingPingPongListenActorExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingPingPongListenActorExample.scala new file mode 100644 index 0000000..1cb2bc2 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/messaging/ScalarMessagingPingPongListenActorExample.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.scalar.examples.messaging + +import org.apache.ignite.messaging.MessagingListenActor +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.UUID +import java.util.concurrent.CountDownLatch + +/** + * Demonstrates simple protocol-based exchange in playing a ping-pong between + * two nodes. It is analogous to `MessagingPingPongExample` on Java side. + * <p/> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p/> + * Alternatively you can run `ExampleNodeStartup` in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarMessagingPingPongListenActorExample extends App { + scalar("examples/config/example-ignite.xml") { + pingPong() + } + + /** + * Implements Ping Pong example between local and remote node. + */ + def pingPong() { + val g = ignite$ + + val cluster = cluster$ + + if (cluster.nodes().size < 2) { + println(">>>") + println(">>> I need a partner to play a ping pong!") + println(">>>") + } + else { + // Pick first remote node as a partner. + val nodeB = cluster.forNode(g.remoteNodes$().head) + + // Set up remote player: configure remote node 'rmt' to listen + // for messages from local node 'loc'. + g.message(nodeB).remoteListen(null, new MessagingListenActor[String]() { + def receive(nodeId: UUID, msg: String) { + println("Received message [msg=" + msg + ", sender=" + nodeId + ']') + + msg match { + case "PING" => respond("PONG") + case "STOP" => stop() + } + } + }) + + val latch = new CountDownLatch(10) + + // Set up local player: configure local node 'loc' + // to listen for messages from remote node 'rmt'. + ignite$.message().localListen(null, new MessagingListenActor[String]() { + def receive(nodeId: UUID, msg: String) { + println("Received message [msg=" + msg + ", sender=" + nodeId + ']') + + if (latch.getCount == 1) + stop("STOP") + else // We know it's 'PONG'. + respond("PING") + + latch.countDown() + } + }) + + // Serve! + nodeB.send$("PING", null) + + // Wait til the match is over. + latch.await() + } + } + + /** + * Implements Ping Pong example between two remote nodes. + */ + def pingPong2() { + val g = ignite$ + + val cluster = cluster$ + + if (cluster.forRemotes().nodes().size() < 2) { + println(">>>") + println(">>> I need at least two remote nodes!") + println(">>>") + } + else { + // Pick two remote nodes. + val n1 = cluster.forRemotes().head + val n2 = cluster.forRemotes().tail.head + + val n1p = cluster.forNode(n1) + val n2p = cluster.forNode(n2) + + // Configure remote node 'n1' to receive messages from 'n2'. + g.message(n1p).remoteListen(null, new MessagingListenActor[String] { + def receive(nid: UUID, msg: String) { + println(msg) + + msg match { + case "PING" => respond("PONG") + case "STOP" => stop() + } + } + }) + + // Configure remote node 'n2' to receive messages from 'n1'. + g.message(n2p).remoteListen(null, new MessagingListenActor[String] { + // Get local count down latch. + private lazy val latch: CountDownLatch = cluster.nodeLocalMap().get("latch") + + def receive(nid: UUID, msg: String) { + println(msg) + + latch.getCount match { + case 1 => stop("STOP") + case _ => respond("PING") + } + + latch.countDown() + } + }) + + // 1. Sets latch into node local storage so that local actor could use it. + // 2. Sends first 'PING' to 'n1'. + // 3. Waits until all messages are exchanged between two remote nodes. + n2p.run$(() => { + val latch = new CountDownLatch(10) + + cluster.nodeLocalMap[String, CountDownLatch].put("latch", latch) + + n1p.send$("PING", null) + + latch.await() + }, null) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/client/memcache/ScalarMemcacheRestExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/client/memcache/ScalarMemcacheRestExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/client/memcache/ScalarMemcacheRestExample.scala new file mode 100644 index 0000000..d8d6487 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/client/memcache/ScalarMemcacheRestExample.scala @@ -0,0 +1,120 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.misc.client.memcache + +import org.apache.ignite.examples.misc.client.memcache.MemcacheRestExampleNodeStartup +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import net.spy.memcached.{BinaryConnectionFactory, MemcachedClient} + +import java.io.IOException +import java.net.InetSocketAddress +import java.util.Arrays + +/** + * This example shows how to use Memcache client for manipulating Ignite cache. + * <p> + * Ignite implements Memcache binary protocol and it is available if + * REST is enabled on the node. + * Remote nodes should always be started using [[MemcacheRestExampleNodeStartup]]. + */ +object ScalarMemcacheRestExample extends App { + /** Cache name. */ + private val CACHE_NAME = ScalarMemcacheRestExample.getClass.getSimpleName + + /** Hostname for client connection. */ + private val host = "localhost" + + /** Port number for client connection. */ + private val port = 11211 + + scalar(MemcacheRestExampleNodeStartup.configuration()) { + var client: MemcachedClient = null + + try { + println() + println(">>> Memcache REST example started.") + + val cache = createCache$[String, AnyRef](CACHE_NAME) + + try { + client = startMemcachedClient(host, port) + + if (client.add("strKey", 0, "strVal").get) + println(">>> Successfully put string value using Memcache client.") + + println(">>> Getting value for 'strKey' using Ignite cache API: " + cache.get("strKey")) + println(">>> Getting value for 'strKey' using Memcache client: " + client.get("strKey")) + + if (client.delete("strKey").get) + println(">>> Successfully removed string value using Memcache client.") + + println(">>> Current cache size: " + cache.size() + " (expected: 0).") + + if (client.add("intKey", 0, 100).get) + println(">>> Successfully put integer value using Memcache client.") + + println(">>> Getting value for 'intKey' using Ignite cache API: " + cache.get("intKey")) + println(">>> Getting value for 'intKey' using Memcache client: " + client.get("intKey")) + + if (client.delete("intKey").get) + println(">>> Successfully removed integer value using Memcache client.") + + println(">>> Current cache size: " + cache.size() + " (expected: 0).") + + val l = long$("atomicLong", true, 10) + + if (client.incr("atomicLong", 5, 0) == 15) + println(">>> Successfully incremented atomic long by 5.") + + println(">>> New atomic long value: " + l.incrementAndGet + " (expected: 16).") + + if (client.decr("atomicLong", 3, 0) == 13) + println(">>> Successfully decremented atomic long by 3.") + + println(">>> New atomic long value: " + l.decrementAndGet + " (expected: 12).") + } + finally { + cache.close() + } + } + finally { + if (client != null) + client.shutdown() + } + } + + /** + * Creates Memcache client that uses binary protocol and connects to Ignite. + * + * @param host Hostname. + * @param port Port number. + * @return Client. + * @throws IOException If connection failed. + */ + @throws(classOf[IOException]) + private def startMemcachedClient(host: String, port: Int): MemcachedClient = { + assert(host != null) + assert(port > 0) + + new MemcachedClient(new BinaryConnectionFactory, Arrays.asList(new InetSocketAddress(host, port))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/deployment/ScalarDeploymentExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/deployment/ScalarDeploymentExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/deployment/ScalarDeploymentExample.scala new file mode 100644 index 0000000..504f3db --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/deployment/ScalarDeploymentExample.scala @@ -0,0 +1,126 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.misc.deployment + +import org.apache.ignite.IgniteCompute +import org.apache.ignite.compute._ +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import org.jetbrains.annotations.Nullable + +import java.util.{ArrayList => JavaArrayList, Collection => JavaCollection, List => JavaList} + +import scala.collection.JavaConversions._ + +/** + * Demonstrates how to explicitly deploy a task. Note that + * it is very rare when you would need such functionality as tasks are + * auto-deployed on demand first time you execute them. So in most cases + * you would just apply any of the `Ignite.execute(...)` methods directly. + * However, sometimes a task is not in local class path, so you may not even + * know the code it will execute, but you still need to execute it. For example, + * you have two independent components in the system, and one loads the task + * classes from some external source and deploys it; then another component + * can execute it just knowing the name of the task. + * <p> + * Also note that for simplicity of the example, the task we execute is + * in system classpath, so even in this case the deployment step is unnecessary. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarDeploymentExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Name of the deployed task. */ + private[deployment] final val TASK_NAME = "ExampleTask" + + scalar(CONFIG) { + println() + println(">>> Deployment example started.") + + // This task will be deployed on local node and then peer-loaded + // onto remote nodes on demand. For this example this task is + // available on the classpath, however in real life that may not + // always be the case. In those cases you should use explicit + // 'IgniteCompute.localDeployTask(Class, ClassLoader) apply and + // then use 'IgniteCompute.execute(String, Object)' method + // passing your task name as first parameter. + ignite$.compute.localDeployTask(classOf[ExampleTask], classOf[ExampleTask].getClassLoader) + + for (e <- ignite$.compute.localTasks.entrySet) + println(">>> Found locally deployed task [alias=" + e.getKey + ", taskCls=" + e.getValue) + + // Execute the task passing its name as a parameter. The system will find + // the deployed task by its name and execute it. + ignite$.compute.execute(TASK_NAME, null) + + // Execute the task passing class name as a parameter. The system will find + // the deployed task by its class name and execute it. + // g.compute().execute(ExampleTask.class.getName(), null).get(); + // Undeploy task + ignite$.compute.undeployTask(TASK_NAME) + + println() + println(">>> Finished executing Ignite Direct Deployment Example.") + println(">>> Check participating nodes output.") + } + + /** + * Example task used to demonstrate direct task deployment through API. + * For this example this task as available on the classpath, however + * in real life that may not always be the case. In those cases + * you should use explicit [[IgniteCompute#localDeployTask(Class, ClassLoader)]] apply and + * then use [[IgniteCompute#execute(String, Object)]] + * method passing your task name as first parameter. + * <p> + * Note that this task specifies explicit task name. Task name is optional + * and is added here for demonstration purpose. If not provided, it will + * default to the task class name. + */ + @ComputeTaskName(TASK_NAME) class ExampleTask extends ComputeTaskSplitAdapter[String, AnyRef] { + @impl protected def split(clusterSize: Int, arg: String): JavaCollection[_ <: ComputeJob] = { + val jobs = new JavaArrayList[ComputeJob](clusterSize) + + for (i <- 0 until clusterSize) { + jobs.add(new ComputeJobAdapter { + @Nullable @impl override def execute(): AnyRef = { + println(">>> Executing deployment example job on this node.") + + null + } + }) + } + + jobs + } + + @impl def reduce(results: JavaList[ComputeJobResult]): AnyRef = { + null + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/lifecycle/ScalarLifecycleExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/lifecycle/ScalarLifecycleExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/lifecycle/ScalarLifecycleExample.scala new file mode 100644 index 0000000..60e97b3 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/lifecycle/ScalarLifecycleExample.scala @@ -0,0 +1,90 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.misc.lifecycle + +import org.apache.ignite.configuration.IgniteConfiguration +import org.apache.ignite.examples.misc.lifecycle.LifecycleExample.LifecycleExampleBean +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.lifecycle.LifecycleEventType._ +import org.apache.ignite.lifecycle.{LifecycleBean, LifecycleEventType} +import org.apache.ignite.resources.IgniteInstanceResource +import org.apache.ignite.{Ignite, Ignition} + +/** + * This example shows how to provide your own [[LifecycleBean]] implementation + * to be able to hook into Ignite lifecycle. The [[LifecycleExampleBean]] bean + * will output occurred lifecycle events to the console. + * <p> + * This example does not require remote nodes to be started. + */ +object ScalarLifecycleExample extends App { + println() + println(">>> Lifecycle example started.") + + // Create new configuration. + val cfg = new IgniteConfiguration + + val bean = new LifecycleExampleBean + + // Provide lifecycle bean to configuration. + cfg.setLifecycleBeans(bean) + + val ignite = Ignition.start(cfg) + + try { + assert(bean.isStarted) + } + finally { + if (ignite != null) + ignite.close() + } + + // Make sure that lifecycle bean was notified about ignite stop. + assert(!bean.isStarted) + + /** + * Simple [[LifecycleBean]] implementation that outputs event type when it is occurred. + */ + class LifecycleExampleBean extends LifecycleBean { + /** Auto-inject ignite instance. */ + @IgniteInstanceResource private var ignite: Ignite = null + + /** Started flag. */ + private var isStartedInt = false + + @impl def onLifecycleEvent(evt: LifecycleEventType) { + println() + println(">>> Lifecycle event occurred: " + evt) + println(">>> Ignite name: " + ignite.name) + + if (evt eq AFTER_NODE_START) + isStartedInt = true + else if (evt eq AFTER_NODE_STOP) + isStartedInt = false + } + + /** + * @return `True` if ignite has been started. + */ + def isStarted: Boolean = { + isStartedInt + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/schedule/ScalarComputeScheduleExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/schedule/ScalarComputeScheduleExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/schedule/ScalarComputeScheduleExample.scala new file mode 100644 index 0000000..e3ab621 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/schedule/ScalarComputeScheduleExample.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.scalar.examples.misc.schedule + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.lang.IgniteRunnable +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.lang.{Integer => JavaInt} +import java.util.concurrent.Callable + +/** + * Demonstrates a cron-based [[Runnable]] execution scheduling. + * Test runnable object broadcasts a phrase to all cluster nodes every minute + * three times with initial scheduling delay equal to five seconds. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarComputeScheduleExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println("Compute schedule example started.") + + // Schedule output message every minute. + val fut = ignite$.scheduler.scheduleLocal(new Callable[JavaInt] { + private var invocations = 0 + + @impl def call(): JavaInt = { + invocations += 1 + + ignite$.compute.broadcast(new IgniteRunnable { + @impl def run() { + println() + println("Howdy! :)") + } + }) + + invocations + } + }, "{5, 3} * * * * *") + + while (!fut.isDone) + println(">>> Invocation #: " + fut.get) + + println() + println(">>> Schedule future is done and has been unscheduled.") + println(">>> Check all nodes for hello message output.") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/springbean/ScalarSpringBeanExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/springbean/ScalarSpringBeanExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/springbean/ScalarSpringBeanExample.scala new file mode 100644 index 0000000..ea20a3c --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/springbean/ScalarSpringBeanExample.scala @@ -0,0 +1,80 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.misc.springbean + +import org.apache.ignite.Ignite +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl + +import org.springframework.context.support.ClassPathXmlApplicationContext + +import java.util.concurrent.Callable +import java.util.{ArrayList => JavaArrayList, Collection => JavaCollection} + +/** + * Demonstrates a simple use of Ignite configured with Spring. + * <p> + * String "Hello World." is printed out by Callable passed into + * the executor service provided by Ignite. This statement could be printed + * out on any node in the cluster. + * <p> + * The major point of this example is to show ignite injection by Spring + * framework. Ignite bean is described in `spring-bean.xml` file and instantiated + * by Spring context. Once application completed its execution Spring will + * apply ignite bean destructor and stop the ignite. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarSpringBeanExample extends App { + println() + println(">>> Spring bean example started.") + + // Initialize Spring factory. + val ctx = new ClassPathXmlApplicationContext("org/apache/ignite/examples/misc/springbean/spring-bean.xml") + + try { + val ignite = ctx.getBean("mySpringBean").asInstanceOf[Ignite] + + val exec = ignite.executorService + + val res = exec.submit(new Callable[String] { + @impl def call: String = { + println("Hello world!") + + null + } + }) + + res.get + + println(">>>") + println(">>> Finished executing Ignite \"Spring bean\" example.") + println(">>> You should see printed out of 'Hello world' on one of the nodes.") + println(">>> Check all nodes for output (this node is also part of the cluster).") + println(">>>") + } + finally { + ctx.destroy() + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/springbean/spring-bean.xml ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/springbean/spring-bean.xml b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/springbean/spring-bean.xml new file mode 100644 index 0000000..8569ac5 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/misc/springbean/spring-bean.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + ~ /* + ~ * Licensed to the Apache Software Foundation (ASF) under one or more + ~ * contributor license agreements. See the NOTICE file distributed with + ~ * this work for additional information regarding copyright ownership. + ~ * The ASF licenses this file to You under the Apache License, Version 2.0 + ~ * (the "License"); you may not use this file except in compliance with + ~ * the License. You may obtain a copy of the License at + ~ * + ~ * http://www.apache.org/licenses/LICENSE-2.0 + ~ * + ~ * Unless required by applicable law or agreed to in writing, software + ~ * distributed under the License is distributed on an "AS IS" BASIS, + ~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ * See the License for the specific language governing permissions and + ~ * limitations under the License. + ~ */ + --> + +<!-- + Command line (default) ignite configuration. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> + <description>Main Spring file for ignite configuration.</description> + + <!-- Example of bean definition with given configuration. --> + <bean id="mySpringBean" class="org.apache.ignite.IgniteSpringBean"> + <property name="configuration"> + <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <!-- Set to true to enable ignite-aware class loading for examples, default is false. --> + <property name="peerClassLoadingEnabled" value="true"/> + + <property name="marshaller"> + <bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller"> + <!-- Set to false to allow non-serializable objects in examples, default is true. --> + <property name="requireSerializable" value="false"/> + </bean> + </property> + + <!-- Enable task execution events for examples. --> + <property name="includeEventTypes"> + <util:constant static-field="org.apache.ignite.events.EventType.EVTS_TASK_EXECUTION"/> + </property> + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <property name="addresses"> + <list> + <!-- In distributed environment, replace with actual host IP address. --> + <value>127.0.0.1:47500..47509</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/servicegrid/ScalarServicesExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/servicegrid/ScalarServicesExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/servicegrid/ScalarServicesExample.scala new file mode 100644 index 0000000..5845a2d --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/servicegrid/ScalarServicesExample.scala @@ -0,0 +1,147 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.servicegrid + +import org.apache.ignite.examples.servicegrid.{SimpleMapService, SimpleMapServiceImpl} +import org.apache.ignite.examples.{ExampleNodeStartup, ExamplesUtils} +import org.apache.ignite.lang.IgniteCallable +import org.apache.ignite.resources.ServiceResource +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{Ignite, Ignition} + +import java.util.{Collection => JavaCollection} + +import scala.collection.JavaConversions._ + +/** + * Example that demonstrates how to deploy distributed services in Ignite. + * Distributed services are especially useful when deploying singletons on the ignite, + * be that cluster-singleton, or per-node-singleton, etc... + * <p> + * To start remote nodes, you must run [[ExampleNodeStartup]] in another JVM + * which will start node with `examples/config/example-ignite.xml` configuration. + * <p> + * NOTE:<br/> + * Starting `ignite.sh` directly will not work, as distributed services + * cannot be peer-deployed and classes must be on the classpath for every node. + */ +object ScalarServicesExample extends App{ + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + // Mark this node as client node. + Ignition.setClientMode(true) + + scalar(CONFIG) { + if (ExamplesUtils.hasServerNodes(ignite$)) { + val svcs = ignite$.services(cluster$.forServers) + + try { + svcs.deployClusterSingleton("myClusterSingletonService", new SimpleMapServiceImpl) + + svcs.deployNodeSingleton("myNodeSingletonService", new SimpleMapServiceImpl) + + svcs.deployMultiple("myMultiService", new SimpleMapServiceImpl, 2, 0) + + serviceProxyExample(ignite$) + + serviceInjectionExample(ignite$) + } + finally { + ignite$.services.cancelAll() + } + } + } + + /** + * Simple example to demonstrate service proxy invocation of a remotely deployed service. + * + * @param ignite Ignite instance. + * @throws Exception If failed. + */ + @throws(classOf[Exception]) + private def serviceProxyExample(ignite: Ignite) { + println(">>>") + println(">>> Starting service proxy example.") + println(">>>") + + val mapSvc: SimpleMapService[Integer, String] = ignite.services.serviceProxy("myNodeSingletonService", + classOf[SimpleMapService[_, _]], true) + + val cnt = 10 + + for (i <- 0 until cnt) + mapSvc.put(i, Integer.toString(i)) + + val mapSize = mapSvc.size + + println("Map service size: " + mapSize) + + if (mapSize != cnt) throw new Exception("Invalid map size [expected=" + cnt + ", actual=" + mapSize + ']') + } + + /** + * Simple example to demonstrate how to inject service proxy into distributed closures. + * + * @param ignite Ignite instance. + * @throws Exception If failed. + */ + @throws(classOf[Exception]) + private def serviceInjectionExample(ignite: Ignite) { + println(">>>") + println(">>> Starting service injection example.") + println(">>>") + + val mapSvc: SimpleMapService[Integer, String] = ignite.services.serviceProxy("myClusterSingletonService", + classOf[SimpleMapService[_, _]], true) + + val cnt = 10 + + for (i <- 0 until cnt) + mapSvc.put(i, Integer.toString(i)) + + val mapSizes: JavaCollection[Integer] = ignite.compute.broadcast(new SimpleClosure) + + println("Closure execution result: " + mapSizes) + + + for (mapSize <- mapSizes) if (mapSize != cnt) throw new Exception("Invalid map size [expected=" + cnt + + ", actual=" + mapSize + ']') + } + + /** + * Simple closure to demonstrate auto-injection of the service proxy. + */ + private class SimpleClosure extends IgniteCallable[Integer] { + @ServiceResource(serviceName = "myClusterSingletonService", proxyInterface = classOf[SimpleMapService[_, _]]) + @transient + private val mapSvc: SimpleMapService[_, _] = null + + @throws(classOf[Exception]) + def call: Integer = { + val mapSize = mapSvc.size + + println("Executing closure [mapSize=" + mapSize + ']') + + mapSize + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/streaming/ScalarStreamTransformerExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/streaming/ScalarStreamTransformerExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/streaming/ScalarStreamTransformerExample.scala new file mode 100644 index 0000000..aeb0fce --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/streaming/ScalarStreamTransformerExample.scala @@ -0,0 +1,104 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.streaming + +import org.apache.ignite.Ignition +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.examples.{ExampleNodeStartup, ExamplesUtils} +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.stream.StreamTransformer + +import javax.cache.processor.MutableEntry +import java.lang.{Long => JavaLong} +import java.util.Random + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using [[ExampleNodeStartup]] or by starting remote nodes as specified below.</li> + * <li>Start streaming using [[ScalarStreamTransformerExample]].</li> + * </ul> + * <p> + * You should start remote nodes by running [[ExampleNodeStartup]] in another JVM. + */ +object ScalarStreamTransformerExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Random number generator. */ + private val RAND = new Random + + /** Range within which to generate numbers. */ + private val RANGE = 1000 + + scalar(CONFIG) { + Ignition.setClientMode(true) + + if (ExamplesUtils.hasServerNodes(ignite$)) { + val stmCache = createCache$[Integer, JavaLong]("randomNumbers", + indexedTypes = Seq(classOf[Integer], classOf[JavaLong])) + + try { + val stmr = dataStreamer$[Integer, JavaLong](stmCache.getName) + + try { + stmr.allowOverwrite(true) + + stmr.receiver(new StreamTransformer[Integer, JavaLong] { + @impl def process(e: MutableEntry[Integer, JavaLong], args: AnyRef*): AnyRef = { + val v = e.getValue + + e.setValue(if (v == null) + 1L + else + v + 1) + + null + } + }) + + for (i <- 0 until 1000000) { + stmr.addData(RAND.nextInt(RANGE), 1L) + + if (i % 500000 == 0) + println("Number of tuples streamed into Ignite: " + i) + } + } + finally { + stmr.close() + } + + val top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10") + + val top10 = stmCache.query(top10Qry).getAll + + println("Top 10 most popular numbers:") + + ExamplesUtils.printQueryResults(top10) + } + finally { + stmCache.close() + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/streaming/ScalarStreamVisitorExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/streaming/ScalarStreamVisitorExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/streaming/ScalarStreamVisitorExample.scala new file mode 100644 index 0000000..d9aeba0 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/streaming/ScalarStreamVisitorExample.scala @@ -0,0 +1,160 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.streaming + +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.cache.query.annotations.QuerySqlField +import org.apache.ignite.examples.{ExampleNodeStartup, ExamplesUtils} +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.stream.StreamVisitor +import org.apache.ignite.{IgniteCache, Ignition} + +import java.io.Serializable +import java.util.{Map => JavaMap, Random} + + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using [[ExampleNodeStartup]] or by starting remote nodes as specified below.</li> + * <li>Start streaming using [[ScalarStreamVisitorExample]].</li> + * </ul> + * <p> + * You should start remote nodes by running [[ExampleNodeStartup]] in another JVM. + */ +object ScalarStreamVisitorExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Random number generator. */ + private val RAND = new Random + + /** The list of instruments. */ + private val INSTRUMENTS = Array("IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT") + + /** The list of initial instrument prices. */ + private val INITIAL_PRICES = Array(194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50) + + scalar(CONFIG) { + Ignition.setClientMode(true) + + if (ExamplesUtils.hasServerNodes(ignite$)) { + val mktCache = createCache$[String, Double]("marketTicks") + + try { + val instCache = createCache$[String, Instrument]("instCache", + indexedTypes = Seq(classOf[String], classOf[Instrument])) + + try { + val mktStmr = ignite$.dataStreamer[String, Double](mktCache.getName) + + try { + mktStmr.receiver(new StreamVisitor[String, Double] { + @impl def apply(cache: IgniteCache[String, Double], e: JavaMap.Entry[String, Double]) { + val symbol = e.getKey + val tick = e.getValue + var inst = instCache.get(symbol) + + if (inst == null) + inst = new Instrument(symbol) + + inst.update(tick) + + instCache.put(symbol, inst) + } + }) + + for (i <- 0 until 10000000) { + val idx = RAND.nextInt(INSTRUMENTS.length) + + val price = round2(INITIAL_PRICES(idx) + RAND.nextGaussian) + + mktStmr.addData(INSTRUMENTS(idx), price) + + if (i % 500000 == 0) + println("Number of tuples streamed into Ignite: " + i) + } + } + finally { + if (mktStmr != null) mktStmr.close() + } + + val top3qry = new SqlFieldsQuery("select symbol, " + + "(latest - open) from Instrument order by (latest - open) desc limit 3") + + val top3 = instCache.query(top3qry).getAll + + println("Top performing financial instruments: ") + + ExamplesUtils.printQueryResults(top3) + } + finally { + if (instCache != null) instCache.close() + } + } + finally { + mktCache.close() + } + } + } + + /** + * Rounds double value to two significant signs. + * + * @param value value to be rounded. + * @return rounded double value. + */ + private def round2(value: Double): Double = { + Math.floor(100 * value + 0.5) / 100 + } + +} + + +/** + * Financial instrument. + * + * @param symb Instrument symbol. + */ +class Instrument(symb: String) extends Serializable { + /** Instrument symbol. */ + @QuerySqlField(index = true) private final val symbol: String = symb + + /** Open price. */ + @QuerySqlField(index = true) private var open: Double = .0 + + /** Close price. */ + @QuerySqlField(index = true) private var latest: Double = .0 + + /** + * Updates this instrument based on the latest market tick price. + * + * @param price Latest price. + */ + def update(price: Double) { + if (open == 0) + open = price + + this.latest = price + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala index 52d5ce1..8ba50c3 100644 --- a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala +++ b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala @@ -17,9 +17,34 @@ package org.apache.ignite.scalar.tests.examples -import org.apache.ignite.scalar.examples._ +import org.apache.ignite.configuration.IgniteConfiguration +import org.apache.ignite.examples.computegrid.failover.ComputeFailoverNodeStartup +import org.apache.ignite.examples.misc.client.memcache.MemcacheRestExampleNodeStartup +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.scalar.examples.computegrid._ +import org.apache.ignite.scalar.examples.computegrid.cluster.ScalarComputeClusterGroupExample +import org.apache.ignite.scalar.examples.computegrid.failover.ScalarComputeFailoverExample +import org.apache.ignite.scalar.examples.computegrid.montecarlo.ScalarComputeCreditRiskExample +import org.apache.ignite.scalar.examples.datagrid._ +import org.apache.ignite.scalar.examples.datagrid.hibernate.ScalarHibernateL2CacheExample +import org.apache.ignite.scalar.examples.datagrid.starschema.ScalarStarSchemaExample +import org.apache.ignite.scalar.examples.datagrid.store.dummy.ScalarCacheDummyStoreExample +import org.apache.ignite.scalar.examples.datagrid.store.hibernate.ScalarCacheHibernateStoreExample +import org.apache.ignite.scalar.examples.datagrid.store.jdbc.ScalarCacheJdbcStoreExample +import org.apache.ignite.scalar.examples.datastructures._ +import org.apache.ignite.scalar.examples.events.ScalarEventsExample +import org.apache.ignite.scalar.examples.igfs.ScalarIgfsExample +import org.apache.ignite.scalar.examples.messaging.{ScalarMessagingExample, ScalarMessagingPingPongListenActorExample} +import org.apache.ignite.scalar.examples.misc.client.memcache.ScalarMemcacheRestExample +import org.apache.ignite.scalar.examples.misc.deployment.ScalarDeploymentExample +import org.apache.ignite.scalar.examples.misc.lifecycle.ScalarLifecycleExample +import org.apache.ignite.scalar.examples.misc.schedule.ScalarComputeScheduleExample +import org.apache.ignite.scalar.examples.misc.springbean.ScalarSpringBeanExample +import org.apache.ignite.scalar.examples.servicegrid.ScalarServicesExample +import org.apache.ignite.scalar.examples.streaming.{ScalarStreamTransformerExample, ScalarStreamVisitorExample} import org.apache.ignite.scalar.scalar import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest + import org.scalatest.junit.JUnitSuiteLike /** @@ -29,14 +54,154 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik /** */ private def EMPTY_ARGS = Array.empty[String] + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + private def runWithNode(cfgPath: String, f: () => Unit) { + val ignite = IgnitionEx.start(cfgPath, "secondNode") + + try { + f() + } + finally { + ignite.close() + } + } + + private def runWithNode(cfg: IgniteConfiguration, f: () => Unit) { + cfg.setGridName("secondNode") + + scalar(cfg) { + f() + } + } + /** Compute grid examples */ + + /** */ + def testScalarComputeClusterGroupExample() { + runWithNode(CONFIG, () => ScalarComputeClusterGroupExample.main(EMPTY_ARGS)) + } + + /** */ + def testScalarComputeAsyncExample() { + ScalarComputeAsyncExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeBroadcastExample() { + ScalarComputeBroadcastExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeCallableExample() { + ScalarComputeCallableExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeClosureExample() { + ScalarComputeClosureExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeContinuousMapperExample() { + ScalarComputeContinuousMapperExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeFibonacciContinuationExample() { + ScalarComputeFibonacciContinuationExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeReduceExample() { + ScalarComputeReduceExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeRunnableExample() { + ScalarComputeRunnableExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeTaskMapExample() { + ScalarComputeTaskMapExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeTaskSplitExample() { + ScalarComputeTaskSplitExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeCreditRiskExample() { + ScalarComputeCreditRiskExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeFailoverExample() { + runWithNode(ComputeFailoverNodeStartup.configuration(), () => ScalarComputeFailoverExample.main(EMPTY_ARGS)) + } + + /** Data grid example */ + + /** */ + def testScalarHibernateL2CacheExample() { + ScalarHibernateL2CacheExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarStarSchemaExample() { + ScalarStarSchemaExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarCacheDummyStoreExample() { + ScalarCacheDummyStoreExample.main(EMPTY_ARGS) + } + /** */ - def testScalarCacheAffinitySimpleExample() { + def testScalarCacheHibernateStoreExample() { + ScalarCacheHibernateStoreExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarCacheJdbcStoreExample() { + ScalarCacheJdbcStoreExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarCacheAffinityExample() { ScalarCacheAffinityExample.main(EMPTY_ARGS) } /** */ - def testScalarCacheExample() { - ScalarCacheExample.main(EMPTY_ARGS) + def testScalarCacheApiExample() { + ScalarCacheApiExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarCacheAsyncApiExample() { + ScalarCacheAsyncApiExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarCacheContinuousQueryExample() { + ScalarCacheContinuousQueryExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarCacheDataStreamerExample() { + ScalarCacheDataStreamerExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarCacheEventsExample() { + ScalarCacheEventsExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarCachePutGetExample() { + ScalarCachePutGetExample.main(EMPTY_ARGS) } /** */ @@ -45,54 +210,117 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik } /** */ - def testScalarClosureExample() { - ScalarClosureExample.main(EMPTY_ARGS) + def testScalarCacheTransactionExample() { + ScalarCacheTransactionExample.main(EMPTY_ARGS) } + /** Data structures examples */ + /** */ - def testScalarContinuationExample() { - ScalarContinuationExample.main(EMPTY_ARGS) + def testScalarExecutorServiceExample() { + ScalarExecutorServiceExample.main(EMPTY_ARGS) } /** */ - def testScalarCreditRiskExample() { - ScalarCreditRiskExample.main(EMPTY_ARGS) + def testScalarIgniteAtomicLongExample() { + ScalarIgniteAtomicLongExampleStartup.main(EMPTY_ARGS) } /** */ - def testScalarPingPongExample() { - scalar("modules/scalar/src/test/resources/spring-ping-pong-partner.xml") { - ScalarPingPongExample.main(EMPTY_ARGS) - } + def testScalarIgniteAtomicSequenceExample() { + ScalarIgniteAtomicSequenceExampleStartup.main(EMPTY_ARGS) } /** */ - def testScalarPopularNumbersRealTimeExample() { - ScalarCachePopularNumbersExample.main(EMPTY_ARGS) + def testScalarIgniteAtomicStampedExample() { + ScalarIgniteAtomicStampedExample.main(EMPTY_ARGS) } /** */ - def testScalarPrimeExample() { - ScalarPrimeExample.main(EMPTY_ARGS) + def testScalarIgniteCountDownLatchExample() { + ScalarIgniteCountDownLatchExample.main(EMPTY_ARGS) } /** */ - def testScalarScheduleCallableExample() { - ScalarScheduleExample.main(EMPTY_ARGS) + def testScalarIgniteQueueExample() { + ScalarIgniteQueueExampleStartup.main(EMPTY_ARGS) } /** */ - def testScalarTaskExample() { - ScalarTaskExample.main(EMPTY_ARGS) + def testScalarIgniteSetExample() { + ScalarIgniteSetExample.main(EMPTY_ARGS) } + /** Event examples */ + + /** */ + def testScalarEventsExample() { + ScalarEventsExample.main(EMPTY_ARGS) + } + + /** IGFS examples */ + + /** */ + def testScalarIgfsExample() { + runWithNode("examples/config/filesystem/example-igfs.xml", () => ScalarIgfsExample.main(EMPTY_ARGS)) + } + + /** Messaging examples */ + + /** */ + def testScalarMessagingExample() { + runWithNode(CONFIG, () => ScalarMessagingExample.main(EMPTY_ARGS)) + } + + /** */ + def testScalarMessagingPingPongListenActorExample() { + runWithNode("modules/scalar/src/test/resources/spring-ping-pong-partner.xml", + () => ScalarMessagingPingPongListenActorExample.main(EMPTY_ARGS)) + } + + /** Misc examples */ + + /** */ + def testScalarDeploymentExample() { + ScalarDeploymentExample.main(EMPTY_ARGS) + } + + /** */ + def testMemcacheRestExampleNode() { + runWithNode(MemcacheRestExampleNodeStartup.configuration(), () => ScalarMemcacheRestExample.main(EMPTY_ARGS)) + } + + /** */ + def testScalarLifecycleExample() { + ScalarLifecycleExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarComputeScheduleExample() { + ScalarComputeScheduleExample.main(EMPTY_ARGS) + } + + /** */ + def testScalarSpringBeanExample() { + ScalarSpringBeanExample.main(EMPTY_ARGS) + } + + /** Service grid examples */ + + /** */ + def testScalarServicesExample() { + ScalarServicesExample.main(EMPTY_ARGS) + } + + /** Streaming examples */ + /** */ - def testScalarWorldShortestMapReduceExample() { - ScalarWorldShortestMapReduce.main(EMPTY_ARGS) + def testScalarStreamTransformerExample() { + ScalarStreamTransformerExample.main(EMPTY_ARGS) } /** */ - def testScalarSnowflakeSchemaExample() { - ScalarSnowflakeSchemaExample.main(EMPTY_ARGS) + def testScalarStreamVisitorExample() { + ScalarStreamVisitorExample.main(EMPTY_ARGS) } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala index c5bc085..60cfd4f 100644 --- a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala +++ b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala @@ -18,11 +18,15 @@ package org.apache.ignite.scalar import org.apache.ignite._ -import org.apache.ignite.cache.CacheMode +import org.apache.ignite.cache.CacheAtomicityMode._ +import org.apache.ignite.cache.CacheMode._ +import org.apache.ignite.cache.{CacheAtomicityMode, CacheMode} import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField} import org.apache.ignite.cluster.ClusterNode -import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.ignite.configuration.{CollectionConfiguration, CacheConfiguration, IgniteConfiguration} +import org.apache.ignite.configuration.TransactionConfiguration._ import org.apache.ignite.internal.IgniteVersionUtils._ +import org.apache.ignite.transactions.{TransactionIsolation, TransactionConcurrency} import org.jetbrains.annotations.Nullable import java.net.URL @@ -275,22 +279,31 @@ object scalar extends ScalarConversions { Option(Ignition.ignite.cache(cacheName)) /** - * Creates cache cache with specified parameters in default grid. + * Creates cache with specified parameters in default grid. * * @param cacheName Name of the cache to get. */ @inline def createCache$[K, V](@Nullable cacheName: String, cacheMode: CacheMode = CacheMode.PARTITIONED, - indexedTypes: Seq[Class[_]] = Seq.empty): IgniteCache[K, V] = { + indexedTypes: Seq[Class[_]] = Seq.empty, + @Nullable atomicityMode: CacheAtomicityMode = null): IgniteCache[K, V] = { val cfg = new CacheConfiguration[K, V]() cfg.setName(cacheName) cfg.setCacheMode(cacheMode) cfg.setIndexedTypes(indexedTypes:_*) + cfg.setAtomicityMode(atomicityMode) Ignition.ignite.createCache(cfg) } /** + * Creates cache with specified configuration in default grid. + * + * @param config Name of the cache to get. + */ + @inline def createCache$[K, V](config: CacheConfiguration[K, V]): IgniteCache[K, V] = ignite$.getOrCreateCache(config) + + /** * Gets named cache from specified grid. * * @param gridName Name of the grid. @@ -311,7 +324,7 @@ object scalar extends ScalarConversions { */ @inline def dataStreamer$[K, V]( @Nullable cacheName: String, - bufSize: Int): IgniteDataStreamer[K, V] = { + bufSize: Int = IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE): IgniteDataStreamer[K, V] = { val dl = ignite$.dataStreamer[K, V](cacheName) dl.perNodeBufferSize(bufSize) @@ -320,6 +333,103 @@ object scalar extends ScalarConversions { } /** + * Gets a new instance of Ignite set implementation with given name and parameters. + * + * @param name Name of set. + * @param mode Cache mode. + * @param atomicityMode Cache atomicity mode. + * @tparam T Type of stored values. + * @return New instance of Ignite set implementation. + */ + @inline def set$[T](name: String, mode: CacheMode = PARTITIONED, atomicityMode: CacheAtomicityMode = TRANSACTIONAL) = { + val setCfg: CollectionConfiguration = new CollectionConfiguration + + setCfg.setCacheMode(mode) + setCfg.setAtomicityMode(atomicityMode) + + ignite$.set[T](name, setCfg) + } + + /** + * Gets a new instance of Ignite atomic long implementation with given name and parameters. + * + * @param name Name of set. + * @param create Boolean flag indicating whether data structure should be created if does not exist. + * @param initVal Initial value for atomic long. Ignored if create flag is false. + * @tparam T Type of stored values. + * @return New instance of Ignite set implementation. + */ + @inline def long$[T](name: String, create: Boolean, initVal: Long = 0) = { + ignite$.atomicLong(name, initVal, create) + } + + /** + * Gets a new instance of Ignite atomic long implementation with given name and parameters. + * + * @param name Atomic stamped name. + * @param initVal Initial value for atomic stamped. Ignored if { @code create} flag is { @code false}. + * @param initStamp Initial stamp for atomic stamped. Ignored if { @code create} flag is { @code false}. + * @param create Boolean flag indicating whether data structure should be created if does not exist. + * @return Atomic stamped for the given name. + */ + @inline def atomicStamped$[T, S](name: String, @Nullable initVal: T, @Nullable initStamp: S, create: Boolean) = + ignite$.atomicStamped(name, initVal, initStamp, create) + + /** + * Gets a new instance of Ignite atomic sequence implementation with given name and parameters. + * + * @param name Atomic sequence name. + * @param initVal Initial value for atomic sequence. Ignored if { @code create} flag is { @code false}. + * @param create Boolean flag indicating whether data structure should be created if does not exist. + * @return Atomic sequence for the given name. + */ + @inline def atomicSequence$(name: String, initVal: Long, create: Boolean): IgniteAtomicSequence = + ignite$.atomicSequence(name, initVal, create) + + /** + * Gets a new instance of Ignite count down latch implementation with given name and parameters. + * + * @param name Name of the latch. + * @param cnt Count for new latch creation. Ignored if { @code create} flag is { @code false}. + * @param autoDel `true` to automatically delete latch from cache when its count reaches zero. + * Ignored if `create` flag is `false`. + * @param create Boolean flag indicating whether data structure should be created if does not exist. + + * @return Ignite count down latch for the given name. + */ + @inline def countDownLatch$(name: String, cnt: Int, autoDel: Boolean, create: Boolean): IgniteCountDownLatch = + ignite$.countDownLatch(name, cnt, autoDel, create) + + /** + * Gets a new instance of Ignite queue implementation with given name and parameters. + * + * @param queueName Queue name. + * @param mode Cache mode. + */ + @inline def queue$[T](queueName: String, mode: CacheMode = PARTITIONED) = { + val colCfg: CollectionConfiguration = new CollectionConfiguration + + colCfg.setCacheMode(mode) + + ignite$.queue[T](queueName, 0, colCfg) + } + + /** + * Start new transaction with specified parameters. + * + * @param concurrency Transaction concurrency control. + * @param isolation transaction isolation levels. + */ + @inline def transaction$(concurrency: TransactionConcurrency = DFLT_TX_CONCURRENCY, + isolation: TransactionIsolation = DFLT_TX_ISOLATION) = ignite$.transactions().txStart(concurrency, isolation) + + /** + * Gets ExecutorService which will execute all submitted Callable and Runnable jobs on all cluster nodes + */ + @inline def executorService$ = + ignite$.executorService() + + /** * Gets default grid instance. */ @inline def ignite$: Ignite = Ignition.ignite @@ -343,6 +453,11 @@ object scalar extends ScalarConversions { } /** + * Gets an instance of [[IgniteCluster]] interface. + */ + @inline def cluster$ = ignite$.cluster() + + /** * Gets grid for given node ID. * * @param locNodeId Local node ID for which to get grid instance option.