IGNITE-45 - Examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cbf64c92 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cbf64c92 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cbf64c92 Branch: refs/heads/ignite-45 Commit: cbf64c92ef81b5ea1d3bb99f1ec9fedeee005ecb Parents: 41d6c97 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sat Mar 21 00:48:39 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sat Mar 21 00:48:39 2015 -0700 ---------------------------------------------------------------------- .../store/CacheNodeWithStoreStartup.java | 4 - .../java7/datagrid/store/CacheStoreExample.java | 1 - .../store/dummy/CacheDummyPersonStore.java | 1 - .../hibernate/CacheHibernatePersonStore.java | 1 - .../store/jdbc/CacheJdbcPersonStore.java | 1 - .../store/jdbc/CacheJdbcPojoPersonStore.java | 1 - .../computegrid/ComputeBroadcastExample.java | 101 ++++++++++ .../computegrid/ComputeCallableExample.java | 70 +++++++ .../computegrid/ComputeClosureExample.java | 69 +++++++ .../ComputeClusterGroupsExample.java | 82 ++++++++ .../ComputeContinuousMapperExample.java | 158 ++++++++++++++++ .../ComputeExecutorServiceExample.java | 67 +++++++ .../ComputeFibonacciContinuationExample.java | 189 +++++++++++++++++++ .../computegrid/ComputeReducerExample.java | 87 +++++++++ .../computegrid/ComputeRunnableExample.java | 67 +++++++ .../computegrid/ComputeScheduleExample.java | 64 +++++++ .../computegrid/ComputeTaskMapExample.java | 106 +++++++++++ .../computegrid/ComputeTaskSplitExample.java | 93 +++++++++ .../failover/ComputeFailoverExample.java | 137 ++++++++++++++ .../failover/ComputeFailoverNodeStartup.java | 77 ++++++++ .../computegrid/failover/package-info.java | 22 +++ .../java8/computegrid/montecarlo/Credit.java | 110 +++++++++++ .../montecarlo/CreditRiskExample.java | 153 +++++++++++++++ .../montecarlo/CreditRiskManager.java | 143 ++++++++++++++ .../computegrid/montecarlo/package-info.java | 22 +++ .../java8/computegrid/package-info.java | 22 +++ .../ignite/examples/BasicExamplesSelfTest.java | 1 - .../ignite/examples/CacheExamplesSelfTest.java | 4 - ...heStoreLoadDataExampleMultiNodeSelfTest.java | 1 - .../examples/CheckpointExamplesSelfTest.java | 1 - .../ComputeClusterGroupsExampleSelfTest.java | 1 - .../examples/ContinuationExamplesSelfTest.java | 1 - .../ContinuousMapperExamplesSelfTest.java | 1 - .../examples/DeploymentExamplesSelfTest.java | 1 - .../ignite/examples/EventsExamplesSelfTest.java | 1 - ...ibernateL2CacheExampleMultiNodeSelfTest.java | 1 - .../HibernateL2CacheExampleSelfTest.java | 1 - .../ignite/examples/IgfsExamplesSelfTest.java | 1 - .../examples/LifecycleExamplesSelfTest.java | 1 - .../MemcacheRestExamplesMultiNodeSelfTest.java | 1 - .../examples/MemcacheRestExamplesSelfTest.java | 1 - .../examples/MessagingExamplesSelfTest.java | 1 - .../examples/MonteCarloExamplesSelfTest.java | 1 - .../examples/SpringBeanExamplesSelfTest.java | 1 - .../ignite/examples/TaskExamplesSelfTest.java | 1 - pom.xml | 40 ++-- 46 files changed, 1859 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheNodeWithStoreStartup.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheNodeWithStoreStartup.java b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheNodeWithStoreStartup.java index 3191611..979e9f7 100644 --- a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheNodeWithStoreStartup.java +++ b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheNodeWithStoreStartup.java @@ -21,10 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.examples.datagrid.store.dummy.*; -import org.apache.ignite.examples.datagrid.store.hibernate.*; -import org.apache.ignite.examples.datagrid.store.jdbc.*; -import org.apache.ignite.examples.datagrid.store.model.*; import org.apache.ignite.examples.java7.datagrid.store.dummy.*; import org.apache.ignite.examples.java7.datagrid.store.hibernate.*; import org.apache.ignite.examples.java7.datagrid.store.jdbc.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheStoreExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheStoreExample.java index bcf43a1..e8e3d8f 100644 --- a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheStoreExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/CacheStoreExample.java @@ -19,7 +19,6 @@ package org.apache.ignite.examples.java7.datagrid.store; import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.examples.datagrid.store.model.*; import org.apache.ignite.examples.java7.datagrid.store.model.*; import org.apache.ignite.transactions.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/dummy/CacheDummyPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/dummy/CacheDummyPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/dummy/CacheDummyPersonStore.java index ff1be52..acbc08c 100644 --- a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/dummy/CacheDummyPersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/dummy/CacheDummyPersonStore.java @@ -19,7 +19,6 @@ package org.apache.ignite.examples.java7.datagrid.store.dummy; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; -import org.apache.ignite.examples.datagrid.store.model.*; import org.apache.ignite.examples.java7.datagrid.store.model.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/hibernate/CacheHibernatePersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/hibernate/CacheHibernatePersonStore.java b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/hibernate/CacheHibernatePersonStore.java index 16c199e..2883375 100644 --- a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/hibernate/CacheHibernatePersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/hibernate/CacheHibernatePersonStore.java @@ -18,7 +18,6 @@ package org.apache.ignite.examples.java7.datagrid.store.hibernate; import org.apache.ignite.cache.store.*; -import org.apache.ignite.examples.datagrid.store.model.*; import org.apache.ignite.examples.java7.datagrid.store.model.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPersonStore.java index daf1674..349fde6 100644 --- a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPersonStore.java @@ -19,7 +19,6 @@ package org.apache.ignite.examples.java7.datagrid.store.jdbc; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; -import org.apache.ignite.examples.datagrid.store.model.*; import org.apache.ignite.examples.java7.datagrid.store.model.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPojoPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPojoPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPojoPersonStore.java index 87efd33..7760fb1 100644 --- a/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPojoPersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/java7/datagrid/store/jdbc/CacheJdbcPojoPersonStore.java @@ -19,7 +19,6 @@ package org.apache.ignite.examples.java7.datagrid.store.jdbc; import org.apache.ignite.*; import org.apache.ignite.cache.store.jdbc.*; -import org.apache.ignite.examples.datagrid.store.model.*; import org.apache.ignite.examples.java7.datagrid.store.model.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.h2.tools.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeBroadcastExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeBroadcastExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeBroadcastExample.java new file mode 100644 index 0000000..72ca722 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeBroadcastExample.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java7.*; + +import java.util.*; + +/** + * Demonstrates broadcasting computations within cluster. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeBroadcastExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Compute broadcast example started."); + + // Print hello message on all nodes. + hello(ignite); + + // Gather system info from all nodes. + gatherSystemInfo(ignite); + } + } + + /** + * Print 'Hello' message on all nodes. + * + * @param ignite Ignite instance. + * @throws IgniteException If failed. + */ + private static void hello(Ignite ignite) throws IgniteException { + // Print out hello message on all nodes. + ignite.compute().broadcast(() -> { + System.out.println(); + System.out.println(">>> Hello Node! :)"); + }); + + System.out.println(); + System.out.println(">>> Check all nodes for hello message output."); + } + + /** + * Gather system info from all nodes and print it out. + * + * @param ignite Ignite instance. + * @throws IgniteException if failed. + */ + private static void gatherSystemInfo(Ignite ignite) throws IgniteException { + // Gather system info from all nodes. + Collection<String> res = ignite.compute().broadcast(() -> { + System.out.println(); + System.out.println("Executing task on node: " + ignite.cluster().localNode().id()); + + return "Node ID: " + ignite.cluster().localNode().id() + "\n" + + "OS: " + System.getProperty("os.name") + " " + System.getProperty("os.version") + " " + + System.getProperty("os.arch") + "\n" + + "User: " + System.getProperty("user.name") + "\n" + + "JRE: " + System.getProperty("java.runtime.name") + " " + + System.getProperty("java.runtime.version"); + }); + + // Print result. + System.out.println(); + System.out.println("Nodes system information:"); + System.out.println(); + + res.forEach(r -> { + System.out.println(r); + System.out.println(); + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeCallableExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeCallableExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeCallableExample.java new file mode 100644 index 0000000..cedab92 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeCallableExample.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java7.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.stream.*; + +/** + * Demonstrates using of {@link IgniteCallable} job execution on the cluster. + * <p> + * This example takes a sentence composed of multiple words and counts number of non-space + * characters in the sentence by having each compute job count characters in each individual + * word. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeCallableExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Compute callable example started."); + + Stream<IgniteCallable<Integer>> calls = Stream.of("Count characters using callable".split(" ")).map(word -> + (IgniteCallable<Integer>)() -> { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + return word.length(); + }); + + // Execute collection of callables on the ignite. + Collection<Integer> res = ignite.compute().call(calls.collect(Collectors.toList())); + + int sum = res.stream().mapToInt(i -> i).sum(); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + sum + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeClosureExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeClosureExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeClosureExample.java new file mode 100644 index 0000000..35f976b --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeClosureExample.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java7.*; + +import java.util.*; + +/** + * Demonstrates a simple use of Ignite with reduce closure. + * <p> + * This example splits a phrase into collection of words, computes their length on different + * nodes and then computes total amount of non-whitespaces characters in the phrase. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeClosureExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Compute closure example started."); + + // Execute closure on all cluster nodes. + Collection<Integer> res = ignite.compute().apply( + (String word) -> { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + // Return number of letters in the word. + return word.length(); + }, + // Job parameters. Ignite will create as many jobs as there are parameters. + Arrays.asList("Count characters using closure".split(" ")) + ); + + int sum = res.stream().mapToInt(i -> i).sum(); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + sum + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeClusterGroupsExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeClusterGroupsExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeClusterGroupsExample.java new file mode 100644 index 0000000..b52bc7e --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeClusterGroupsExample.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.examples.java7.*; + +/** + * Demonstrates new functional APIs. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeClusterGroupsExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2)) + return; + + System.out.println(); + System.out.println("Compute example started."); + + IgniteCluster cluster = ignite.cluster(); + + // Say hello to all nodes in the cluster, including local node. + sayHello(ignite, cluster); + + // Say hello to all remote nodes. + sayHello(ignite, cluster.forRemotes()); + + // Pick random node out of remote nodes. + ClusterGroup randomNode = cluster.forRemotes().forRandom(); + + // Say hello to a random node. + sayHello(ignite, randomNode); + + // Say hello to all nodes residing on the same host with random node. + sayHello(ignite, cluster.forHost(randomNode.node())); + + // Say hello to all nodes that have current CPU load less than 50%. + sayHello(ignite, cluster.forPredicate(n -> n.metrics().getCurrentCpuLoad() < 0.5)); + } + } + + /** + * Print 'Hello' message on remote nodes. + * + * @param ignite Ignite. + * @param grp Cluster group. + * @throws IgniteException If failed. + */ + private static void sayHello(Ignite ignite, final ClusterGroup grp) throws IgniteException { + // Print out hello message on all cluster nodes. + ignite.compute(grp).broadcast( + () -> System.out.println(">>> Hello Node: " + grp.ignite().cluster().localNode().id())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeContinuousMapperExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeContinuousMapperExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeContinuousMapperExample.java new file mode 100644 index 0000000..3e420e4 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeContinuousMapperExample.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.java7.*; +import org.apache.ignite.resources.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Demonstrates usage of continuous mapper. With continuous mapper + * it is possible to continue mapping jobs asynchronously even after + * initial {@link ComputeTask#map(List, Object)} method completes. + * <p> + * String "Hello Continuous Mapper" is passed as an argument for execution + * of {@link ComputeContinuousMapperExample.ContinuousMapperTask}. As an outcome, participating + * nodes will print out a single word from the passed in string and return + * number of characters in that word. However, to demonstrate continuous + * mapping, next word will be mapped to a node only after the result from + * previous word has been received. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeContinuousMapperExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + System.out.println(); + System.out.println(">>> Compute continuous mapper example started."); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + int phraseLen = ignite.compute().execute(ContinuousMapperTask.class, "Hello Continuous Mapper"); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + phraseLen + "'."); + } + } + + /** + * This task demonstrates how continuous mapper is used. The passed in phrase + * is split into multiple words and next word is sent out for processing only + * when the result for the previous word was received. + * <p> + * Note that annotation {@link ComputeTaskNoResultCache} is optional and tells Ignite + * not to accumulate results from individual jobs. In this example we increment + * total character count directly in {@link #result(ComputeJobResult, List)} method, + * and therefore don't need to accumulate them be be processed at reduction step. + */ + @ComputeTaskNoResultCache + private static class ContinuousMapperTask extends ComputeTaskAdapter<String, Integer> { + /** This field will be injected with task continuous mapper. */ + @TaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** Word queue. */ + private final Queue<String> words = new ConcurrentLinkedQueue<>(); + + /** Total character count. */ + private final AtomicInteger totalChrCnt = new AtomicInteger(0); + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String phrase) { + if (phrase == null || phrase.isEmpty()) + throw new IgniteException("Phrase is empty."); + + // Populate word queue. + Collections.addAll(words, phrase.split(" ")); + + // Sends first word. + sendWord(); + + // Since we have sent at least one job, we are allowed to return + // 'null' from map method. + return null; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + // If there is an error, fail-over to another node. + if (res.getException() != null) + return super.result(res, rcvd); + + // Add result to total character count. + totalChrCnt.addAndGet(res.<Integer>getData()); + + sendWord(); + + // If next word was sent, keep waiting, otherwise work queue is empty and we reduce. + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + return totalChrCnt.get(); + } + + /** + * Sends next queued word to the next node implicitly selected by load balancer. + */ + private void sendWord() { + // Remove first word from the queue. + String word = words.poll(); + + if (word != null) { + // Map next word. + mapper.send(new ComputeJobAdapter(word) { + @Override public Object execute() { + String word = argument(0); + + System.out.println(); + System.out.println(">>> Printing '" + word + "' from ignite job at time: " + new Date()); + + int cnt = word.length(); + + // Sleep for some time so it will be visually noticeable that + // jobs are executed sequentially. + try { + Thread.sleep(1000); + } + catch (InterruptedException ignored) { + // No-op. + } + + return cnt; + } + }); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeExecutorServiceExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeExecutorServiceExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeExecutorServiceExample.java new file mode 100644 index 0000000..1208ce1 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeExecutorServiceExample.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java7.*; + +import java.util.concurrent.*; +import java.util.stream.*; + +/** + * Simple example to demonstrate usage of distributed executor service provided by Ignite. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public final class ComputeExecutorServiceExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws Exception If example execution failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Compute executor service example started."); + + // Get ignite-enabled executor service. + ExecutorService exec = ignite.executorService(); + + Stream.of("Print words using runnable".split(" ")).forEach(word -> { + exec.submit(() -> { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + }); + }); + + exec.shutdown(); + + // Wait for all jobs to complete (0 means no limit). + exec.awaitTermination(0, TimeUnit.MILLISECONDS); + + System.out.println(); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeFibonacciContinuationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeFibonacciContinuationExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeFibonacciContinuationExample.java new file mode 100644 index 0000000..7c2e665 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeFibonacciContinuationExample.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.java7.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.math.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * This example demonstrates how to use continuation feature of Ignite by + * performing the distributed recursive calculation of {@code 'Fibonacci'} + * numbers on the cluster. Continuations + * functionality is exposed via {@link ComputeJobContext#holdcc()} and + * {@link ComputeJobContext#callcc()} method calls in {@link ComputeFibonacciContinuationExample.FibonacciClosure} class. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public final class ComputeFibonacciContinuationExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute Fibonacci continuation example started."); + + long N = 100; + + UUID exampleNodeId = ignite.cluster().localNode().id(); + + // Filter to exclude this node from execution. + IgnitePredicate<ClusterNode> nodeFilter = n -> + ignite.cluster().forRemotes().nodes().isEmpty() || !n.id().equals(exampleNodeId); + + long start = System.currentTimeMillis(); + + BigInteger fib = ignite.compute(ignite.cluster().forPredicate(nodeFilter)). + apply(new FibonacciClosure(nodeFilter), N); + + long duration = System.currentTimeMillis() - start; + + System.out.println(); + System.out.println(">>> Finished executing Fibonacci for '" + N + "' in " + duration + " ms."); + System.out.println(">>> Fibonacci sequence for input number '" + N + "' is '" + fib + "'."); + System.out.println(">>> If you re-run this example w/o stopping remote nodes - the performance will"); + System.out.println(">>> increase since intermediate results are pre-cache on remote nodes."); + System.out.println(">>> You should see prints out every recursive Fibonacci execution on cluster nodes."); + System.out.println(">>> Check remote nodes for output."); + } + } + + /** + * Closure to execute. + */ + private static class FibonacciClosure implements IgniteClosure<Long, BigInteger> { + /** Future for spawned task. */ + private IgniteFuture<BigInteger> fut1; + + /** Future for spawned task. */ + private IgniteFuture<BigInteger> fut2; + + /** Auto-inject job context. */ + @JobContextResource + private ComputeJobContext jobCtx; + + /** Auto-inject ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Predicate. */ + private final IgnitePredicate<ClusterNode> nodeFilter; + + /** + * @param nodeFilter Predicate to filter nodes. + */ + FibonacciClosure(IgnitePredicate<ClusterNode> nodeFilter) { + this.nodeFilter = nodeFilter; + } + + /** {@inheritDoc} */ + @Nullable @Override public BigInteger apply(Long n) { + if (fut1 == null || fut2 == null) { + System.out.println(); + System.out.println(">>> Starting fibonacci execution for number: " + n); + + // Make sure n is not negative. + n = Math.abs(n); + + if (n <= 2) + return n == 0 ? BigInteger.ZERO : BigInteger.ONE; + + // Node-local storage. + ConcurrentMap<Long, IgniteFuture<BigInteger>> locMap = ignite.cluster().nodeLocalMap(); + + // Check if value is cached in node-local-map first. + fut1 = locMap.get(n - 1); + fut2 = locMap.get(n - 2); + + ClusterGroup p = ignite.cluster().forPredicate(nodeFilter); + + IgniteCompute compute = ignite.compute(p).withAsync(); + + // If future is not cached in node-local-map, cache it. + if (fut1 == null) { + compute.apply(new FibonacciClosure(nodeFilter), n - 1); + + ComputeTaskFuture<BigInteger> futVal = compute.future(); + + fut1 = locMap.putIfAbsent(n - 1, futVal); + + if (fut1 == null) + fut1 = futVal; + } + + // If future is not cached in node-local-map, cache it. + if (fut2 == null) { + compute.apply(new FibonacciClosure(nodeFilter), n - 2); + + ComputeTaskFuture<BigInteger> futVal = compute.<BigInteger>future(); + + fut2 = locMap.putIfAbsent(n - 2, futVal); + + if (fut2 == null) + fut2 = futVal; + } + + // If futures are not done, then wait asynchronously for the result + if (!fut1.isDone() || !fut2.isDone()) { + IgniteInClosure<IgniteFuture<BigInteger>> lsnr = f -> { + // If both futures are done, resume the continuation. + if (fut1.isDone() && fut2.isDone()) + // CONTINUATION: + // ============= + // Resume suspended job execution. + jobCtx.callcc(); + }; + + // CONTINUATION: + // ============= + // Hold (suspend) job execution. + // It will be resumed in listener above via 'callcc()' call + // once both futures are done. + jobCtx.holdcc(); + + // Attach the same listener to both futures. + fut1.listen(lsnr); + fut2.listen(lsnr); + + return null; + } + } + + assert fut1.isDone() && fut2.isDone(); + + // Return cached results. + return fut1.get().add(fut2.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeReducerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeReducerExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeReducerExample.java new file mode 100644 index 0000000..d8e2a48 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeReducerExample.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java7.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Demonstrates a simple use of Ignite with reduce closure. + * <p> + * Phrase is split into words and distributed across nodes where length of each word is + * calculated. Then total phrase length is calculated using reducer. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start Ignite node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeReducerExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute reducer example started."); + + Integer sum = ignite.compute().apply( + word -> { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + // Return number of letters in the word. + return word.length(); + }, + + // Job parameters. Ignite will create as many jobs as there are parameters. + Arrays.asList("Count characters using reducer".split(" ")), + + // Reducer to process results as they come. + new IgniteReducer<Integer, Integer>() { + private AtomicInteger sum = new AtomicInteger(); + + // Callback for every job result. + @Override public boolean collect(Integer len) { + sum.addAndGet(len); + + // Return true to continue waiting until all results are received. + return true; + } + + // Reduce all results into one. + @Override public Integer reduce() { + return sum.get(); + } + } + ); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + sum + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeRunnableExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeRunnableExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeRunnableExample.java new file mode 100644 index 0000000..a78c863 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeRunnableExample.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java7.*; +import org.apache.ignite.lang.*; + +import java.util.stream.*; + +/** + * Demonstrates a simple use of {@link IgniteRunnable}. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeRunnableExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute runnable example started."); + + // Enable asynchronous mode. + IgniteCompute compute = ignite.compute().withAsync(); + + Stream.of("Print words using runnable".split(" ")). + map(word -> { + // Execute runnable on some node. + compute.run(() -> { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + }); + + return compute.future(); + }). + forEach(IgniteFuture::get); + + System.out.println(); + System.out.println(">>> Finished printing words using runnable execution."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeScheduleExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeScheduleExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeScheduleExample.java new file mode 100644 index 0000000..11e096f --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeScheduleExample.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java7.*; +import org.apache.ignite.scheduler.*; + +/** + * Demonstrates a cron-based {@link Runnable} execution scheduling. + * Test runnable object broadcasts a phrase to all cluster nodes every minute + * three times with initial scheduling delay equal to five seconds. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeScheduleExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute schedule example started."); + + // Schedule output message every minute. + SchedulerFuture<?> fut = ignite.scheduler().scheduleLocal(() -> + ignite.compute().broadcast(() -> { + System.out.println(); + System.out.println("Howdy! :) "); + }), + "{5, 3} * * * * *" // Cron expression. + ); + + while (!fut.isDone()) + System.out.println(">>> Invocation #: " + fut.get()); + + System.out.println(); + System.out.println(">>> Schedule future is done and has been unscheduled."); + System.out.println(">>> Check all nodes for hello message output."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeTaskMapExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeTaskMapExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeTaskMapExample.java new file mode 100644 index 0000000..bc8f305 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeTaskMapExample.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.java7.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Demonstrates a simple use of Ignite with + * {@link ComputeTaskAdapter}. + * <p> + * Phrase passed as task argument is split into words on map stage and distributed among cluster nodes. + * Each node computes word length and returns result to master node where total phrase length is + * calculated on reduce stage. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeTaskMapExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute task map example started."); + + // Execute task on the cluster and wait for its completion. + int cnt = ignite.compute().execute(CharacterCountTask.class, "Hello Ignite Enabled World!"); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } + + /** + * Task to count non-white-space characters in a phrase. + */ + private static class CharacterCountTask extends ComputeTaskAdapter<String, Integer> { + /** + * Splits the received string to words, creates a child job for each word, and sends + * these jobs to other nodes for processing. Each such job simply prints out the received word. + * + * @param nodes Nodes available for this task execution. + * @param arg String to split into words for processing. + * @return Map of jobs to nodes. + */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String arg) { + Map<ComputeJob, ClusterNode> map = new HashMap<>(); + + Iterator<ClusterNode> it = nodes.iterator(); + + for (final String word : arg.split(" ")) { + // If we used all nodes, restart the iterator. + if (!it.hasNext()) + it = nodes.iterator(); + + ClusterNode node = it.next(); + + map.put(new ComputeJobAdapter() { + @Nullable @Override public Object execute() { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + // Return number of letters in the word. + return word.length(); + } + }, node); + } + + return map; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer reduce(List<ComputeJobResult> results) { + return results.stream().mapToInt(ComputeJobResult::getData).sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeTaskSplitExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeTaskSplitExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeTaskSplitExample.java new file mode 100644 index 0000000..0b53ab9 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/ComputeTaskSplitExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.java7.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.stream.*; + +/** + * Demonstrates a simple use of Ignite with {@link ComputeTaskSplitAdapter}. + * <p> + * Phrase passed as task argument is split into jobs each taking one word. Then jobs are distributed among + * cluster nodes. Each node computes word length and returns result to master node where total phrase length + * is calculated on reduce stage. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeTaskSplitExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute task split example started."); + + // Execute task on the cluster and wait for its completion. + int cnt = ignite.compute().execute(CharacterCountTask.class, "Hello Ignite Enabled World!"); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } + + /** + * Task to count non-white-space characters in a phrase. + */ + private static class CharacterCountTask extends ComputeTaskSplitAdapter<String, Integer> { + /** + * Splits the received string to words, creates a child job for each word, and sends + * these jobs to other nodes for processing. Each such job simply prints out the received word. + * + * @param clusterSize Number of available cluster nodes. Note that returned number of + * jobs can be less, equal or greater than this cluster size. + * @param arg Task execution argument. Can be {@code null}. + * @return The list of child jobs. + */ + @Override protected Collection<? extends ComputeJob> split(int clusterSize, String arg) { + return Stream.of(arg.split(" ")).map(word -> + new ComputeJobAdapter() { + @Nullable @Override public Object execute() { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + // Return number of letters in the word. + return word.length(); + } + }).collect(Collectors.toList()); + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer reduce(List<ComputeJobResult> results) { + return results.stream().mapToInt(ComputeJobResult::getData).sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/ComputeFailoverExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/ComputeFailoverExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/ComputeFailoverExample.java new file mode 100644 index 0000000..3c4d393 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/ComputeFailoverExample.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid.failover; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.java7.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; + +import java.util.*; + +/** + * Demonstrates the usage of checkpoints in Ignite. + * <p> + * The example tries to compute phrase length. In order to mitigate possible node failures, intermediate + * result is saved as as checkpoint after each job step. + * <p> + * Remote nodes must be started using {@link ComputeFailoverNodeStartup}. + */ +public class ComputeFailoverExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException{ + try (Ignite ignite = Ignition.start(ComputeFailoverNodeStartup.configuration())) { + if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2)) + return; + + System.out.println(); + System.out.println("Compute failover example started."); + + // Number of letters. + int charCnt = ignite.compute().apply(new CheckPointJob(), "Stage1 Stage2"); + + System.out.println(); + System.out.println(">>> Finished executing fail-over example with checkpoints."); + System.out.println(">>> Total number of characters in the phrase is '" + charCnt + "'."); + System.out.println(">>> You should see exception stack trace from failed job on some node."); + System.out.println(">>> Failed job will be failed over to another node."); + } + } + + @ComputeTaskSessionFullSupport + private static final class CheckPointJob implements IgniteClosure<String, Integer> { + /** Injected distributed task session. */ + @TaskSessionResource + private ComputeTaskSession jobSes; + + /** Injected ignite logger. */ + @LoggerResource + private IgniteLogger log; + + /** */ + private IgniteBiTuple<Integer, Integer> state; + + /** */ + private String phrase; + + /** + * The job will check the checkpoint with key '{@code fail}' and if + * it's {@code true} it will throw exception to simulate a failure. + * Otherwise, it will execute enabled method. + */ + @Override public Integer apply(String phrase) { + System.out.println(); + System.out.println(">>> Executing fail-over example job."); + + this.phrase = phrase; + + List<String> words = Arrays.asList(phrase.split(" ")); + + final String cpKey = checkpointKey(); + + IgniteBiTuple<Integer, Integer> state = jobSes.loadCheckpoint(cpKey); + + int idx = 0; + int sum = 0; + + if (state != null) { + this.state = state; + + // Last processed word index and total length. + idx = state.get1(); + sum = state.get2(); + } + + for (int i = idx; i < words.size(); i++) { + sum += words.get(i).length(); + + this.state = new IgniteBiTuple<>(i + 1, sum); + + // Save checkpoint with scope of task execution. + // It will be automatically removed when task completes. + jobSes.saveCheckpoint(cpKey, this.state); + + // For example purposes, we fail on purpose after first stage. + // This exception will cause job to be failed over to another node. + if (i == 0) { + System.out.println(); + System.out.println(">>> Job will be failed over to another node."); + + throw new ComputeJobFailoverException("Expected example job exception."); + } + } + + return sum; + } + + /** + * Make reasonably unique checkpoint key. + * + * @return Checkpoint key. + */ + private String checkpointKey() { + return CheckPointJob.class.getName() + '-' + phrase; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/ComputeFailoverNodeStartup.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/ComputeFailoverNodeStartup.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/ComputeFailoverNodeStartup.java new file mode 100644 index 0000000..a67c9fc --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/ComputeFailoverNodeStartup.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid.failover; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.checkpoint.sharedfs.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + +import java.util.*; + +/** + * Starts up an empty node with checkpoint-enabled configuration. + * <p> + * The difference is that running this class from IDE adds all example classes to classpath + * but running from command line doesn't. + */ +public class ComputeFailoverNodeStartup { + /** + * Start up an empty node with specified configuration. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + Ignition.start(configuration()); + } + + /** + * Create Ignite configuration with configured checkpoints. + * + * @return Ignite configuration. + * @throws IgniteException If configuration creation failed. + */ + public static IgniteConfiguration configuration() throws IgniteException { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setLocalHost("127.0.0.1"); + cfg.setPeerClassLoadingEnabled(true); + + // Configure checkpoint SPI. + SharedFsCheckpointSpi checkpointSpi = new SharedFsCheckpointSpi(); + + checkpointSpi.setDirectoryPaths(Collections.singletonList("work/checkpoint/sharedfs")); + + cfg.setCheckpointSpi(checkpointSpi); + + // Configure discovery SPI. + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); + + ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509")); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/package-info.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/package-info.java new file mode 100644 index 0000000..876965f --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/failover/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Compute failover example. + */ +package org.apache.ignite.examples.java8.computegrid.failover; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/montecarlo/Credit.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/montecarlo/Credit.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/montecarlo/Credit.java new file mode 100644 index 0000000..50d17e0 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/montecarlo/Credit.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid.montecarlo; + +import java.io.*; + +/** + * This class provides a simple model for a credit contract (or a loan). It is basically + * defines as remaining crediting amount to date, credit remaining term, APR and annual + * probability on default. Although this model is simplified for the purpose + * of this example, it is close enough to emulate the real-life credit + * risk assessment application. + */ +public class Credit implements Serializable { + /** Remaining crediting amount. */ + private final double remAmnt; + + /** Remaining crediting remTerm. */ + private final int remTerm; + + /** Annual percentage rate (APR). */ + private final double apr; + + /** Expected annual probability of default (EaDF). */ + private final double edf; + + /** + * Creates new credit instance with given information. + * + * @param remAmnt Remained crediting amount. + * @param remTerm Remained crediting remTerm. + * @param apr Annual percentage rate (APR). + * @param edf Expected annual probability of default (EaDF). + */ + public Credit(double remAmnt, int remTerm, double apr, double edf) { + this.remAmnt = remAmnt; + this.remTerm = remTerm; + this.apr = apr; + this.edf = edf; + } + + /** + * Gets remained crediting amount. + * + * @return Remained amount of credit. + */ + double getRemainingAmount() { + return remAmnt; + } + + /** + * Gets remained crediting remTerm. + * + * @return Remained crediting remTerm in days. + */ + int getRemainingTerm() { + return remTerm; + } + + /** + * Gets annual percentage rate. + * + * @return Annual percentage rate in relative percents (percentage / 100). + */ + double getAnnualRate() { + return apr; + } + + /** + * Gets either credit probability of default for the given period of time + * if remaining term is less than crediting time or probability of default + * for whole remained crediting time. + * + * @param term Default term. + * @return Credit probability of default in relative percents + * (percentage / 100). + */ + double getDefaultProbability(int term) { + return 1 - Math.exp(Math.log(1 - edf) * Math.min(remTerm, term) / 365.0); + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder buf = new StringBuilder(); + + buf.append(getClass().getName()); + buf.append(" [remAmnt=").append(remAmnt); + buf.append(", remTerm=").append(remTerm); + buf.append(", apr=").append(apr); + buf.append(", edf=").append(edf); + buf.append(']'); + + return buf.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cbf64c92/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/montecarlo/CreditRiskExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/montecarlo/CreditRiskExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/montecarlo/CreditRiskExample.java new file mode 100644 index 0000000..beed5a5 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/computegrid/montecarlo/CreditRiskExample.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.computegrid.montecarlo; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java7.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Monte-Carlo example. Demonstrates distributed credit risk calculation. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public final class CreditRiskExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Credit risk example started."); + + // Create portfolio. + Credit[] portfolio = new Credit[5000]; + + Random rnd = new Random(); + + // Generate some test portfolio items. + for (int i = 0; i < portfolio.length; i++) { + portfolio[i] = new Credit( + 50000 * rnd.nextDouble(), // Credit amount. + rnd.nextInt(1000), // Credit term in days. + rnd.nextDouble() / 10, // APR. + rnd.nextDouble() / 20 + 0.02 // EDF. + ); + } + + // Forecast horizon in days. + int horizon = 365; + + // Number of Monte-Carlo iterations. + int iter = 10000; + + // Percentile. + double percentile = 0.95; + + // Mark the stopwatch. + long start = System.currentTimeMillis(); + + // Calculate credit risk and print it out. + // As you can see the ignite enabling is completely hidden from the caller + // and it is fully transparent to him. In fact, the caller is never directly + // aware if method was executed just locally or on the 100s of cluster nodes. + // Credit risk crdRisk is the minimal amount that creditor has to have + // available to cover possible defaults. + + double crdRisk = ignite.compute().call(jobs(ignite.cluster().nodes().size(), portfolio, horizon, iter, percentile), + new IgniteReducer<Double, Double>() { + /** Collected values sum. */ + private double sum; + + /** Collected values count. */ + private int cnt; + + /** {@inheritDoc} */ + @Override public synchronized boolean collect(Double e) { + sum += e; + cnt++; + + return true; + } + + /** {@inheritDoc} */ + @Override public synchronized Double reduce() { + return sum / cnt; + } + }); + + System.out.println(); + System.out.println("Credit risk [crdRisk=" + crdRisk + ", duration=" + + (System.currentTimeMillis() - start) + "ms]"); + } + // We specifically don't do any error handling here to + // simplify the example. Real application may want to + // add error handling and application specific recovery. + } + + /** + * Creates closures for calculating credit risks. + * + * @param clusterSize Size of the cluster. + * @param portfolio Portfolio. + * @param horizon Forecast horizon in days. + * @param iter Number of Monte-Carlo iterations. + * @param percentile Percentile. + * @return Collection of closures. + */ + private static Collection<IgniteCallable<Double>> jobs(int clusterSize, final Credit[] portfolio, + final int horizon, int iter, final double percentile) { + // Number of iterations should be done by each node. + int iterPerNode = Math.round(iter / (float)clusterSize); + + // Number of iterations for the last/the only node. + int lastNodeIter = iter - (clusterSize - 1) * iterPerNode; + + Collection<IgniteCallable<Double>> clos = new ArrayList<>(clusterSize); + + // Note that for the purpose of this example we perform a simple homogeneous + // (non weighted) split assuming that all computing resources in this split + // will be identical. In real life scenarios when heterogeneous environment + // is used a split that is weighted by, for example, CPU benchmarks of each + // node in the split will be more efficient. It is fairly easy addition and + // Ignite comes with convenient Spring-compatible benchmark that can be + // used for weighted splits. + for (int i = 0; i < clusterSize; i++) { + final int nodeIter = i == clusterSize - 1 ? lastNodeIter : iterPerNode; + + clos.add(new IgniteCallable<Double>() { + /** {@inheritDoc} */ + @Override public Double call() { + return new CreditRiskManager().calculateCreditRiskMonteCarlo( + portfolio, horizon, nodeIter, percentile); + } + }); + } + + return clos; + } +}