# IGNITE-45 - Examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b473e309 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b473e309 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b473e309 Branch: refs/heads/ignite-45 Commit: b473e309950b5388c007832b5bcfcb089f11e169 Parents: 6295600 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sat Mar 21 14:59:35 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sat Mar 21 14:59:35 2015 -0700 ---------------------------------------------------------------------- examples/config/example-cache.xml | 10 +- .../ignite/examples/ExampleNodeStartup.java | 35 ++ .../apache/ignite/examples/ExamplesUtils.java | 132 ++++++ .../computegrid/ComputeBroadcastExample.java | 114 ++++++ .../computegrid/ComputeCallableExample.java | 79 ++++ .../computegrid/ComputeClosureExample.java | 77 ++++ .../ComputeClusterGroupsExample.java | 93 +++++ .../ComputeContinuousMapperExample.java | 158 +++++++ .../ComputeExecutorServiceExample.java | 71 ++++ .../ComputeFibonacciContinuationExample.java | 194 +++++++++ .../computegrid/ComputeReducerExample.java | 89 ++++ .../computegrid/ComputeRunnableExample.java | 74 ++++ .../computegrid/ComputeScheduleExample.java | 81 ++++ .../computegrid/ComputeTaskMapExample.java | 113 +++++ .../computegrid/ComputeTaskSplitExample.java | 102 +++++ .../failover/ComputeFailoverExample.java | 137 +++++++ .../failover/ComputeFailoverNodeStartup.java | 77 ++++ .../computegrid/failover/package-info.java | 22 + .../examples/computegrid/montecarlo/Credit.java | 110 +++++ .../montecarlo/CreditRiskExample.java | 153 +++++++ .../montecarlo/CreditRiskManager.java | 143 +++++++ .../computegrid/montecarlo/package-info.java | 22 + .../examples/computegrid/package-info.java | 22 + .../examples/datagrid/CacheAffinityExample.java | 138 +++++++ .../examples/datagrid/CacheApiExample.java | 128 ++++++ .../datagrid/CacheContinuousQueryExample.java | 107 +++++ .../datagrid/CacheDataStreamerExample.java | 91 +++++ .../examples/datagrid/CacheEventsExample.java | 99 +++++ .../datagrid/CachePopularNumbersExample.java | 172 ++++++++ .../examples/datagrid/CachePutGetExample.java | 113 +++++ .../examples/datagrid/CacheQueryExample.java | 407 +++++++++++++++++++ .../datagrid/CacheTransactionExample.java | 148 +++++++ .../hibernate/HibernateL2CacheExample.java | 204 ++++++++++ .../HibernateL2CacheExampleNodeStartup.java | 97 +++++ .../examples/datagrid/hibernate/Post.java | 126 ++++++ .../examples/datagrid/hibernate/User.java | 151 +++++++ .../datagrid/hibernate/package-info.java | 22 + .../ignite/examples/datagrid/package-info.java | 22 + .../starschema/CacheStarSchemaExample.java | 244 +++++++++++ .../datagrid/starschema/DimProduct.java | 101 +++++ .../examples/datagrid/starschema/DimStore.java | 101 +++++ .../datagrid/starschema/FactPurchase.java | 103 +++++ .../datagrid/starschema/package-info.java | 22 + .../store/CacheNodeWithStoreStartup.java | 155 +++++++ .../datagrid/store/CacheStoreExample.java | 107 +++++ .../store/CacheStoreLoadDataExample.java | 67 +++ .../store/dummy/CacheDummyPersonStore.java | 120 ++++++ .../datagrid/store/dummy/package-info.java | 22 + .../hibernate/CacheHibernatePersonStore.java | 291 +++++++++++++ .../datagrid/store/hibernate/Person.hbm.xml | 34 ++ .../datagrid/store/hibernate/hibernate.cfg.xml | 41 ++ .../datagrid/store/hibernate/package-info.java | 22 + .../store/jdbc/CacheJdbcPersonStore.java | 275 +++++++++++++ .../store/jdbc/CacheJdbcPojoPersonStore.java | 82 ++++ .../datagrid/store/jdbc/package-info.java | 22 + .../examples/datagrid/store/model/Person.java | 155 +++++++ .../examples/datagrid/store/package-info.java | 22 + .../datastructures/IgniteAtomicLongExample.java | 74 ++++ .../IgniteAtomicReferenceExample.java | 110 +++++ .../IgniteAtomicSequenceExample.java | 91 +++++ .../IgniteAtomicStampedExample.java | 117 ++++++ .../IgniteCountDownLatchExample.java | 95 +++++ .../datastructures/IgniteQueueExample.java | 215 ++++++++++ .../datastructures/IgniteSetExample.java | 197 +++++++++ .../examples/datastructures/package-info.java | 22 + .../ignite/examples/events/EventsExample.java | 144 +++++++ .../ignite/examples/events/package-info.java | 22 + .../ignite/examples/igfs/IgfsExample.java | 278 +++++++++++++ .../examples/igfs/IgfsMapReduceExample.java | 249 ++++++++++++ .../ignite/examples/igfs/IgfsNodeStartup.java | 41 ++ .../ignite/examples/igfs/package-info.java | 22 + .../examples/java7/ExampleNodeStartup.java | 35 -- .../ignite/examples/java7/ExamplesUtils.java | 135 ------ .../computegrid/ComputeBroadcastExample.java | 114 ------ .../computegrid/ComputeCallableExample.java | 79 ---- .../computegrid/ComputeClosureExample.java | 77 ---- .../ComputeClusterGroupsExample.java | 93 ----- .../ComputeContinuousMapperExample.java | 158 ------- .../ComputeExecutorServiceExample.java | 71 ---- .../ComputeFibonacciContinuationExample.java | 194 --------- .../computegrid/ComputeReducerExample.java | 89 ---- .../computegrid/ComputeRunnableExample.java | 74 ---- .../computegrid/ComputeScheduleExample.java | 81 ---- .../computegrid/ComputeTaskMapExample.java | 113 ----- .../computegrid/ComputeTaskSplitExample.java | 102 ----- .../failover/ComputeFailoverExample.java | 137 ------- .../failover/ComputeFailoverNodeStartup.java | 77 ---- .../computegrid/failover/package-info.java | 22 - .../java7/computegrid/montecarlo/Credit.java | 110 ----- .../montecarlo/CreditRiskExample.java | 153 ------- .../montecarlo/CreditRiskManager.java | 143 ------- .../computegrid/montecarlo/package-info.java | 22 - .../java7/computegrid/package-info.java | 22 - .../java7/datagrid/CacheAffinityExample.java | 138 ------- .../java7/datagrid/CacheApiExample.java | 128 ------ .../datagrid/CacheContinuousQueryExample.java | 107 ----- .../datagrid/CacheDataStreamerExample.java | 91 ----- .../java7/datagrid/CacheEventsExample.java | 99 ----- .../datagrid/CachePopularNumbersExample.java | 172 -------- .../java7/datagrid/CachePutGetExample.java | 113 ----- .../java7/datagrid/CacheQueryExample.java | 407 ------------------- .../java7/datagrid/CacheTransactionExample.java | 148 ------- .../hibernate/HibernateL2CacheExample.java | 204 ---------- .../HibernateL2CacheExampleNodeStartup.java | 97 ----- .../examples/java7/datagrid/hibernate/Post.java | 126 ------ .../examples/java7/datagrid/hibernate/User.java | 151 ------- .../java7/datagrid/hibernate/package-info.java | 22 - .../examples/java7/datagrid/package-info.java | 22 - .../starschema/CacheStarSchemaExample.java | 244 ----------- .../java7/datagrid/starschema/DimProduct.java | 101 ----- .../java7/datagrid/starschema/DimStore.java | 101 ----- .../java7/datagrid/starschema/FactPurchase.java | 103 ----- .../java7/datagrid/starschema/package-info.java | 22 - .../store/CacheNodeWithStoreStartup.java | 155 ------- .../java7/datagrid/store/CacheStoreExample.java | 107 ----- .../store/CacheStoreLoadDataExample.java | 67 --- .../store/dummy/CacheDummyPersonStore.java | 120 ------ .../datagrid/store/dummy/package-info.java | 22 - .../hibernate/CacheHibernatePersonStore.java | 291 ------------- .../datagrid/store/hibernate/Person.hbm.xml | 34 -- .../datagrid/store/hibernate/hibernate.cfg.xml | 41 -- .../datagrid/store/hibernate/package-info.java | 22 - .../store/jdbc/CacheJdbcPersonStore.java | 275 ------------- .../store/jdbc/CacheJdbcPojoPersonStore.java | 82 ---- .../java7/datagrid/store/jdbc/package-info.java | 22 - .../java7/datagrid/store/model/Person.java | 155 ------- .../java7/datagrid/store/package-info.java | 22 - .../datastructures/IgniteAtomicLongExample.java | 74 ---- .../IgniteAtomicReferenceExample.java | 110 ----- .../IgniteAtomicSequenceExample.java | 91 ----- .../IgniteAtomicStampedExample.java | 117 ------ .../IgniteCountDownLatchExample.java | 95 ----- .../datastructures/IgniteQueueExample.java | 215 ---------- .../java7/datastructures/IgniteSetExample.java | 197 --------- .../java7/datastructures/package-info.java | 22 - .../examples/java7/events/EventsExample.java | 144 ------- .../examples/java7/events/package-info.java | 22 - .../ignite/examples/java7/igfs/IgfsExample.java | 278 ------------- .../java7/igfs/IgfsMapReduceExample.java | 249 ------------ .../examples/java7/igfs/IgfsNodeStartup.java | 41 -- .../examples/java7/igfs/package-info.java | 22 - .../java7/messaging/MessagingExample.java | 173 -------- .../messaging/MessagingPingPongExample.java | 133 ------ .../MessagingPingPongListenActorExample.java | 106 ----- .../examples/java7/messaging/package-info.java | 22 - .../client/memcache/MemcacheRestExample.java | 125 ------ .../MemcacheRestExampleNodeStartup.java | 92 ----- .../misc/client/memcache/package-info.java | 22 - .../java7/misc/client/package-info.java | 22 - .../misc/deployment/DeploymentExample.java | 129 ------ .../java7/misc/deployment/package-info.java | 22 - .../java7/misc/lifecycle/LifecycleExample.java | 92 ----- .../java7/misc/lifecycle/package-info.java | 22 - .../examples/java7/misc/package-info.java | 22 - .../misc/springbean/SpringBeanExample.java | 88 ---- .../java7/misc/springbean/package-info.java | 22 - .../java7/misc/springbean/spring-bean.xml | 70 ---- .../ignite/examples/java7/package-info.java | 22 - .../java7/servicegrid/ServicesExample.java | 167 -------- .../java7/servicegrid/SimpleMapService.java | 49 --- .../java7/servicegrid/SimpleMapServiceImpl.java | 71 ---- .../java7/streaming/marketdata/CacheConfig.java | 44 -- .../java7/streaming/marketdata/Instrument.java | 106 ----- .../java7/streaming/marketdata/MarketTick.java | 59 --- .../marketdata/QueryTopInstruments.java | 73 ---- .../streaming/marketdata/StreamMarketData.java | 109 ----- .../java7/streaming/numbers/CacheConfig.java | 46 --- .../streaming/numbers/QueryPopularNumbers.java | 74 ---- .../streaming/numbers/StreamRandomNumbers.java | 79 ---- .../examples/messaging/MessagingExample.java | 173 ++++++++ .../messaging/MessagingPingPongExample.java | 133 ++++++ .../MessagingPingPongListenActorExample.java | 106 +++++ .../ignite/examples/messaging/package-info.java | 22 + .../client/memcache/MemcacheRestExample.java | 125 ++++++ .../MemcacheRestExampleNodeStartup.java | 92 +++++ .../misc/client/memcache/package-info.java | 22 + .../examples/misc/client/package-info.java | 22 + .../misc/deployment/DeploymentExample.java | 129 ++++++ .../examples/misc/deployment/package-info.java | 22 + .../misc/lifecycle/LifecycleExample.java | 92 +++++ .../examples/misc/lifecycle/package-info.java | 22 + .../ignite/examples/misc/package-info.java | 22 + .../misc/springbean/SpringBeanExample.java | 88 ++++ .../examples/misc/springbean/package-info.java | 22 + .../examples/misc/springbean/spring-bean.xml | 70 ++++ .../apache/ignite/examples/package-info.java | 22 + .../examples/servicegrid/ServicesExample.java | 167 ++++++++ .../examples/servicegrid/SimpleMapService.java | 49 +++ .../servicegrid/SimpleMapServiceImpl.java | 71 ++++ .../streaming/marketdata/CacheConfig.java | 44 ++ .../streaming/marketdata/Instrument.java | 106 +++++ .../streaming/marketdata/MarketTick.java | 59 +++ .../marketdata/QueryTopInstruments.java | 73 ++++ .../streaming/marketdata/StreamMarketData.java | 109 +++++ .../examples/streaming/numbers/CacheConfig.java | 46 +++ .../streaming/numbers/QueryPopularNumbers.java | 74 ++++ .../streaming/numbers/StreamRandomNumbers.java | 79 ++++ .../ignite/examples/java8/ComputeExample.java | 56 --- .../examples/java8/ExampleNodeStartup.java | 35 -- .../ignite/examples/java8/ExamplesUtils.java | 132 ------ .../ignite/examples/java8/MessagingExample.java | 162 -------- .../computegrid/ComputeBroadcastExample.java | 2 +- .../computegrid/ComputeCallableExample.java | 2 +- .../computegrid/ComputeClosureExample.java | 2 +- .../ComputeClusterGroupsExample.java | 2 +- .../ComputeContinuousMapperExample.java | 2 +- .../ComputeExecutorServiceExample.java | 2 +- .../ComputeFibonacciContinuationExample.java | 2 +- .../computegrid/ComputeReducerExample.java | 2 +- .../computegrid/ComputeRunnableExample.java | 2 +- .../computegrid/ComputeScheduleExample.java | 2 +- .../computegrid/ComputeTaskMapExample.java | 2 +- .../computegrid/ComputeTaskSplitExample.java | 2 +- .../failover/ComputeFailoverExample.java | 2 +- .../montecarlo/CreditRiskExample.java | 2 +- .../java8/messaging/MessagingExample.java | 163 ++++++++ .../marketdata/QueryTopInstruments.java | 2 +- .../streaming/marketdata/StreamMarketData.java | 8 +- .../streaming/numbers/QueryPopularNumbers.java | 2 +- .../streaming/numbers/StreamRandomNumbers.java | 2 +- .../ignite/examples/BasicExamplesSelfTest.java | 2 +- .../ignite/examples/CacheExamplesSelfTest.java | 8 +- ...heStoreLoadDataExampleMultiNodeSelfTest.java | 2 +- .../examples/CheckpointExamplesSelfTest.java | 2 +- .../ComputeClusterGroupsExampleSelfTest.java | 2 +- .../examples/ContinuationExamplesSelfTest.java | 2 +- .../ContinuousMapperExamplesSelfTest.java | 2 +- .../examples/DeploymentExamplesSelfTest.java | 2 +- .../ignite/examples/EventsExamplesSelfTest.java | 2 +- ...ibernateL2CacheExampleMultiNodeSelfTest.java | 2 +- .../HibernateL2CacheExampleSelfTest.java | 2 +- .../ignite/examples/IgfsExamplesSelfTest.java | 2 +- .../examples/LifecycleExamplesSelfTest.java | 2 +- .../MemcacheRestExamplesMultiNodeSelfTest.java | 2 +- .../examples/MemcacheRestExamplesSelfTest.java | 2 +- .../examples/MessagingExamplesSelfTest.java | 2 +- .../examples/MonteCarloExamplesSelfTest.java | 2 +- .../examples/SpringBeanExamplesSelfTest.java | 2 +- .../ignite/examples/TaskExamplesSelfTest.java | 2 +- 239 files changed, 10056 insertions(+), 10285 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/config/example-cache.xml ---------------------------------------------------------------------- diff --git a/examples/config/example-cache.xml b/examples/config/example-cache.xml index 00aacab..1cf1e2d 100644 --- a/examples/config/example-cache.xml +++ b/examples/config/example-cache.xml @@ -64,11 +64,11 @@ <list> <!-- Key and value type for SQL table Organization. --> <value>java.util.UUID</value> - <value>org.apache.ignite.examples.java7.datagrid.CacheQueryExample.Organization</value> + <value>org.apache.ignite.examples.datagrid.CacheQueryExample.Organization</value> <!-- Key and value type for SQL table Person. --> <value>org.apache.ignite.cache.affinity.CacheAffinityKey</value> - <value>org.apache.ignite.examples.java7.datagrid.CacheQueryExample.Person</value> + <value>org.apache.ignite.examples.datagrid.CacheQueryExample.Person</value> <!-- Key and value type for SQL table Long. --> <value>java.lang.Integer</value> @@ -76,7 +76,7 @@ <!-- Key and value type for SQL table FactPurchase. --> <value>java.lang.Integer</value> - <value>org.apache.ignite.examples.java7.datagrid.starschema.FactPurchase</value> + <value>org.apache.ignite.examples.datagrid.starschema.FactPurchase</value> </list> </property> </bean> @@ -107,11 +107,11 @@ <list> <!-- Key and value type for SQL table DimStore. --> <value>java.lang.Integer</value> - <value>org.apache.ignite.examples.java7.datagrid.starschema.DimStore</value> + <value>org.apache.ignite.examples.datagrid.starschema.DimStore</value> <!-- Key and value type for SQL table DimProduct. --> <value>java.lang.Integer</value> - <value>org.apache.ignite.examples.java7.datagrid.starschema.DimProduct</value> + <value>org.apache.ignite.examples.datagrid.starschema.DimProduct</value> </list> </property> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java new file mode 100644 index 0000000..f972b53 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples; + +import org.apache.ignite.*; + +/** + * Starts up an empty node with example compute configuration. + */ +public class ExampleNodeStartup { + /** + * Start up an empty node with example compute configuration. + * + * @param args Command line arguments, none required. + * @throws IgniteException If failed. + */ + public static void main(String[] args) throws IgniteException { + Ignition.start("examples/config/example-ignite.xml"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java b/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java new file mode 100644 index 0000000..abc2578 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; + +import java.net.*; +import java.util.*; + +/** + * + */ +public class ExamplesUtils { + /** */ + private static final ClassLoader CLS_LDR = ExamplesUtils.class.getClassLoader(); + + /** + * Exits with code {@code -1} if maximum memory is below 90% of minimally allowed threshold. + * + * @param min Minimum memory threshold. + */ + public static void checkMinMemory(long min) { + long maxMem = Runtime.getRuntime().maxMemory(); + + if (maxMem < .85 * min) { + System.err.println("Heap limit is too low (" + (maxMem / (1024 * 1024)) + + "MB), please increase heap size at least up to " + (min / (1024 * 1024)) + "MB."); + + System.exit(-1); + } + } + + /** + * Returns URL resolved by class loader for classes in examples project. + * + * @return Resolved URL. + */ + public static URL url(String path) { + URL url = CLS_LDR.getResource(path); + + if (url == null) + throw new RuntimeException("Failed to resolve resource URL by path: " + path); + + return url; + } + + /** + * Checks minimum topology size for running a certain example. + * + * @param grp Cluster to check size for. + * @param size Minimum number of nodes required to run a certain example. + * @return {@code True} if check passed, {@code false} otherwise. + */ + public static boolean checkMinTopologySize(ClusterGroup grp, int size) { + int prjSize = grp.nodes().size(); + + if (prjSize < size) { + System.err.println(">>> Please start at least " + size + " cluster nodes to run example."); + + return false; + } + + return true; + } + + /** + * Checks if cluster has server nodes. + * + * @param ignite Ignite instance. + * @return {@code True} if cluster has server nodes, {@code false} otherwise. + */ + public static boolean hasServerNodes(Ignite ignite) { + if (ignite.cluster().forServers().nodes().isEmpty()) { + System.err.println("Server nodes not found (start data nodes with ExampleNodeStartup class)"); + + return false; + } + + return true; + } + + /** + * Convenience method for printing query results. + * + * @param res Query results. + */ + public static void printQueryResults(List<?> res) { + if (res == null || res.isEmpty()) + System.out.println("Query result set is empty."); + else { + for (Object row : res) { + if (row instanceof List) { + System.out.print("("); + + List<?> l = (List)row; + + for (int i = 0; i < l.size(); i++) { + Object o = l.get(i); + + if (o instanceof Double || o instanceof Float) + System.out.printf("%.2f", o); + else + System.out.print(l.get(i)); + + if (i + 1 != l.size()) + System.out.print(','); + } + + System.out.println(')'); + } + else + System.out.println(" " + row); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeBroadcastExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeBroadcastExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeBroadcastExample.java new file mode 100644 index 0000000..2ea878c --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeBroadcastExample.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; + +import java.util.*; + +/** + * Demonstrates broadcasting computations within cluster. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeBroadcastExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Compute broadcast example started."); + + // Print hello message on all nodes. + hello(ignite); + + // Gather system info from all nodes. + gatherSystemInfo(ignite); + } + } + + /** + * Print 'Hello' message on all nodes. + * + * @param ignite Ignite instance. + * @throws IgniteException If failed. + */ + private static void hello(Ignite ignite) throws IgniteException { + // Print out hello message on all nodes. + ignite.compute().broadcast( + new IgniteRunnable() { + @Override public void run() { + System.out.println(); + System.out.println(">>> Hello Node! :)"); + } + } + ); + + System.out.println(); + System.out.println(">>> Check all nodes for hello message output."); + } + + /** + * Gather system info from all nodes and print it out. + * + * @param ignite Ignite instance. + * @throws IgniteException if failed. + */ + private static void gatherSystemInfo(Ignite ignite) throws IgniteException { + // Gather system info from all nodes. + Collection<String> res = ignite.compute().broadcast( + new IgniteCallable<String>() { + // Automatically inject ignite instance. + @IgniteInstanceResource + private Ignite ignite; + + @Override public String call() { + System.out.println(); + System.out.println("Executing task on node: " + ignite.cluster().localNode().id()); + + return "Node ID: " + ignite.cluster().localNode().id() + "\n" + + "OS: " + System.getProperty("os.name") + " " + System.getProperty("os.version") + " " + + System.getProperty("os.arch") + "\n" + + "User: " + System.getProperty("user.name") + "\n" + + "JRE: " + System.getProperty("java.runtime.name") + " " + + System.getProperty("java.runtime.version"); + } + }); + + // Print result. + System.out.println(); + System.out.println("Nodes system information:"); + System.out.println(); + + for (String r : res) { + System.out.println(r); + System.out.println(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeCallableExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeCallableExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeCallableExample.java new file mode 100644 index 0000000..f9a2939 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeCallableExample.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Demonstrates using of {@link IgniteCallable} job execution on the cluster. + * <p> + * This example takes a sentence composed of multiple words and counts number of non-space + * characters in the sentence by having each compute job count characters in each individual + * word. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeCallableExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Compute callable example started."); + + Collection<IgniteCallable<Integer>> calls = new ArrayList<>(); + + // Iterate through all words in the sentence and create callable jobs. + for (final String word : "Count characters using callable".split(" ")) { + calls.add(new IgniteCallable<Integer>() { + @Override public Integer call() throws Exception { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + return word.length(); + } + }); + } + + // Execute collection of callables on the ignite. + Collection<Integer> res = ignite.compute().call(calls); + + int sum = 0; + + // Add up individual word lengths received from remote nodes. + for (int len : res) + sum += len; + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + sum + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeClosureExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeClosureExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeClosureExample.java new file mode 100644 index 0000000..0efb4cc --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeClosureExample.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Demonstrates a simple use of Ignite with reduce closure. + * <p> + * This example splits a phrase into collection of words, computes their length on different + * nodes and then computes total amount of non-whitespaces characters in the phrase. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeClosureExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Compute closure example started."); + + // Execute closure on all cluster nodes. + Collection<Integer> res = ignite.compute().apply( + new IgniteClosure<String, Integer>() { + @Override public Integer apply(String word) { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + // Return number of letters in the word. + return word.length(); + } + }, + + // Job parameters. Ignite will create as many jobs as there are parameters. + Arrays.asList("Count characters using closure".split(" ")) + ); + + int sum = 0; + + // Add up individual word lengths received from remote nodes + for (int len : res) + sum += len; + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + sum + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeClusterGroupsExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeClusterGroupsExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeClusterGroupsExample.java new file mode 100644 index 0000000..40f8598 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeClusterGroupsExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; + +/** + * Demonstrates new functional APIs. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeClusterGroupsExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2)) + return; + + System.out.println(); + System.out.println("Compute example started."); + + IgniteCluster cluster = ignite.cluster(); + + // Say hello to all nodes in the cluster, including local node. + sayHello(ignite, cluster); + + // Say hello to all remote nodes. + sayHello(ignite, cluster.forRemotes()); + + // Pick random node out of remote nodes. + ClusterGroup randomNode = cluster.forRemotes().forRandom(); + + // Say hello to a random node. + sayHello(ignite, randomNode); + + // Say hello to all nodes residing on the same host with random node. + sayHello(ignite, cluster.forHost(randomNode.node())); + + // Say hello to all nodes that have current CPU load less than 50%. + sayHello(ignite, cluster.forPredicate(new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + return n.metrics().getCurrentCpuLoad() < 0.5; + } + })); + } + } + + /** + * Print 'Hello' message on remote nodes. + * + * @param ignite Ignite. + * @param grp Cluster group. + * @throws IgniteException If failed. + */ + private static void sayHello(Ignite ignite, final ClusterGroup grp) throws IgniteException { + // Print out hello message on all cluster nodes. + ignite.compute(grp).broadcast( + new IgniteRunnable() { + @Override public void run() { + // Print ID of remote node on remote node. + System.out.println(">>> Hello Node: " + grp.ignite().cluster().localNode().id()); + } + } + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeContinuousMapperExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeContinuousMapperExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeContinuousMapperExample.java new file mode 100644 index 0000000..715001e --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeContinuousMapperExample.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.resources.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Demonstrates usage of continuous mapper. With continuous mapper + * it is possible to continue mapping jobs asynchronously even after + * initial {@link ComputeTask#map(List, Object)} method completes. + * <p> + * String "Hello Continuous Mapper" is passed as an argument for execution + * of {@link ComputeContinuousMapperExample.ContinuousMapperTask}. As an outcome, participating + * nodes will print out a single word from the passed in string and return + * number of characters in that word. However, to demonstrate continuous + * mapping, next word will be mapped to a node only after the result from + * previous word has been received. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeContinuousMapperExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + System.out.println(); + System.out.println(">>> Compute continuous mapper example started."); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + int phraseLen = ignite.compute().execute(ContinuousMapperTask.class, "Hello Continuous Mapper"); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + phraseLen + "'."); + } + } + + /** + * This task demonstrates how continuous mapper is used. The passed in phrase + * is split into multiple words and next word is sent out for processing only + * when the result for the previous word was received. + * <p> + * Note that annotation {@link ComputeTaskNoResultCache} is optional and tells Ignite + * not to accumulate results from individual jobs. In this example we increment + * total character count directly in {@link #result(ComputeJobResult, List)} method, + * and therefore don't need to accumulate them be be processed at reduction step. + */ + @ComputeTaskNoResultCache + private static class ContinuousMapperTask extends ComputeTaskAdapter<String, Integer> { + /** This field will be injected with task continuous mapper. */ + @TaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** Word queue. */ + private final Queue<String> words = new ConcurrentLinkedQueue<>(); + + /** Total character count. */ + private final AtomicInteger totalChrCnt = new AtomicInteger(0); + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String phrase) { + if (phrase == null || phrase.isEmpty()) + throw new IgniteException("Phrase is empty."); + + // Populate word queue. + Collections.addAll(words, phrase.split(" ")); + + // Sends first word. + sendWord(); + + // Since we have sent at least one job, we are allowed to return + // 'null' from map method. + return null; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + // If there is an error, fail-over to another node. + if (res.getException() != null) + return super.result(res, rcvd); + + // Add result to total character count. + totalChrCnt.addAndGet(res.<Integer>getData()); + + sendWord(); + + // If next word was sent, keep waiting, otherwise work queue is empty and we reduce. + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + return totalChrCnt.get(); + } + + /** + * Sends next queued word to the next node implicitly selected by load balancer. + */ + private void sendWord() { + // Remove first word from the queue. + String word = words.poll(); + + if (word != null) { + // Map next word. + mapper.send(new ComputeJobAdapter(word) { + @Override public Object execute() { + String word = argument(0); + + System.out.println(); + System.out.println(">>> Printing '" + word + "' from ignite job at time: " + new Date()); + + int cnt = word.length(); + + // Sleep for some time so it will be visually noticeable that + // jobs are executed sequentially. + try { + Thread.sleep(1000); + } + catch (InterruptedException ignored) { + // No-op. + } + + return cnt; + } + }); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeExecutorServiceExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeExecutorServiceExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeExecutorServiceExample.java new file mode 100644 index 0000000..3b9be86 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeExecutorServiceExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; + +import java.util.concurrent.*; + +/** + * Simple example to demonstrate usage of distributed executor service provided by Ignite. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public final class ComputeExecutorServiceExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws Exception If example execution failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Compute executor service example started."); + + // Get ignite-enabled executor service. + ExecutorService exec = ignite.executorService(); + + // Iterate through all words in the sentence and create callable jobs. + for (final String word : "Print words using runnable".split(" ")) { + // Execute runnable on some node. + exec.submit(new IgniteRunnable() { + @Override public void run() { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + } + }); + } + + exec.shutdown(); + + // Wait for all jobs to complete (0 means no limit). + exec.awaitTermination(0, TimeUnit.MILLISECONDS); + + System.out.println(); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java new file mode 100644 index 0000000..ff2fae9 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.math.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * This example demonstrates how to use continuation feature of Ignite by + * performing the distributed recursive calculation of {@code 'Fibonacci'} + * numbers on the cluster. Continuations + * functionality is exposed via {@link ComputeJobContext#holdcc()} and + * {@link ComputeJobContext#callcc()} method calls in {@link ComputeFibonacciContinuationExample.FibonacciClosure} class. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public final class ComputeFibonacciContinuationExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute Fibonacci continuation example started."); + + long N = 100; + + final UUID exampleNodeId = ignite.cluster().localNode().id(); + + // Filter to exclude this node from execution. + final IgnitePredicate<ClusterNode> nodeFilter = new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + // Give preference to remote nodes. + return ignite.cluster().forRemotes().nodes().isEmpty() || !n.id().equals(exampleNodeId); + } + }; + + long start = System.currentTimeMillis(); + + BigInteger fib = ignite.compute(ignite.cluster().forPredicate(nodeFilter)).apply(new FibonacciClosure(nodeFilter), N); + + long duration = System.currentTimeMillis() - start; + + System.out.println(); + System.out.println(">>> Finished executing Fibonacci for '" + N + "' in " + duration + " ms."); + System.out.println(">>> Fibonacci sequence for input number '" + N + "' is '" + fib + "'."); + System.out.println(">>> If you re-run this example w/o stopping remote nodes - the performance will"); + System.out.println(">>> increase since intermediate results are pre-cache on remote nodes."); + System.out.println(">>> You should see prints out every recursive Fibonacci execution on cluster nodes."); + System.out.println(">>> Check remote nodes for output."); + } + } + + /** + * Closure to execute. + */ + private static class FibonacciClosure implements IgniteClosure<Long, BigInteger> { + /** Future for spawned task. */ + private IgniteFuture<BigInteger> fut1; + + /** Future for spawned task. */ + private IgniteFuture<BigInteger> fut2; + + /** Auto-inject job context. */ + @JobContextResource + private ComputeJobContext jobCtx; + + /** Auto-inject ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Predicate. */ + private final IgnitePredicate<ClusterNode> nodeFilter; + + /** + * @param nodeFilter Predicate to filter nodes. + */ + FibonacciClosure(IgnitePredicate<ClusterNode> nodeFilter) { + this.nodeFilter = nodeFilter; + } + + /** {@inheritDoc} */ + @Nullable @Override public BigInteger apply(Long n) { + if (fut1 == null || fut2 == null) { + System.out.println(); + System.out.println(">>> Starting fibonacci execution for number: " + n); + + // Make sure n is not negative. + n = Math.abs(n); + + if (n <= 2) + return n == 0 ? BigInteger.ZERO : BigInteger.ONE; + + // Node-local storage. + ConcurrentMap<Long, IgniteFuture<BigInteger>> locMap = ignite.cluster().nodeLocalMap(); + + // Check if value is cached in node-local-map first. + fut1 = locMap.get(n - 1); + fut2 = locMap.get(n - 2); + + ClusterGroup p = ignite.cluster().forPredicate(nodeFilter); + + IgniteCompute compute = ignite.compute(p).withAsync(); + + // If future is not cached in node-local-map, cache it. + if (fut1 == null) { + compute.apply(new FibonacciClosure(nodeFilter), n - 1); + + ComputeTaskFuture<BigInteger> futVal = compute.future(); + + fut1 = locMap.putIfAbsent(n - 1, futVal); + + if (fut1 == null) + fut1 = futVal; + } + + // If future is not cached in node-local-map, cache it. + if (fut2 == null) { + compute.apply(new FibonacciClosure(nodeFilter), n - 2); + + ComputeTaskFuture<BigInteger> futVal = compute.<BigInteger>future(); + + fut2 = locMap.putIfAbsent(n - 2, futVal); + + if (fut2 == null) + fut2 = futVal; + } + + // If futures are not done, then wait asynchronously for the result + if (!fut1.isDone() || !fut2.isDone()) { + IgniteInClosure<IgniteFuture<BigInteger>> lsnr = new IgniteInClosure<IgniteFuture<BigInteger>>() { + @Override public void apply(IgniteFuture<BigInteger> f) { + // If both futures are done, resume the continuation. + if (fut1.isDone() && fut2.isDone()) + // CONTINUATION: + // ============= + // Resume suspended job execution. + jobCtx.callcc(); + } + }; + + // CONTINUATION: + // ============= + // Hold (suspend) job execution. + // It will be resumed in listener above via 'callcc()' call + // once both futures are done. + jobCtx.holdcc(); + + // Attach the same listener to both futures. + fut1.listen(lsnr); + fut2.listen(lsnr); + + return null; + } + } + + assert fut1.isDone() && fut2.isDone(); + + // Return cached results. + return fut1.get().add(fut2.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeReducerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeReducerExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeReducerExample.java new file mode 100644 index 0000000..b678d45 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeReducerExample.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Demonstrates a simple use of Ignite with reduce closure. + * <p> + * Phrase is split into words and distributed across nodes where length of each word is + * calculated. Then total phrase length is calculated using reducer. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start Ignite node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeReducerExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute reducer example started."); + + Integer sum = ignite.compute().apply( + new IgniteClosure<String, Integer>() { + @Override public Integer apply(String word) { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + // Return number of letters in the word. + return word.length(); + } + }, + + // Job parameters. Ignite will create as many jobs as there are parameters. + Arrays.asList("Count characters using reducer".split(" ")), + + // Reducer to process results as they come. + new IgniteReducer<Integer, Integer>() { + private AtomicInteger sum = new AtomicInteger(); + + // Callback for every job result. + @Override public boolean collect(Integer len) { + sum.addAndGet(len); + + // Return true to continue waiting until all results are received. + return true; + } + + // Reduce all results into one. + @Override public Integer reduce() { + return sum.get(); + } + } + ); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + sum + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeRunnableExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeRunnableExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeRunnableExample.java new file mode 100644 index 0000000..1d907e3 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeRunnableExample.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Demonstrates a simple use of {@link IgniteRunnable}. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeRunnableExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute runnable example started."); + + Collection<IgniteFuture> futs = new ArrayList<>(); + + // Enable asynchronous mode. + IgniteCompute compute = ignite.compute().withAsync(); + + // Iterate through all words in the sentence and create callable jobs. + for (final String word : "Print words using runnable".split(" ")) { + // Execute runnable on some node. + compute.run(new IgniteRunnable() { + @Override public void run() { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + } + }); + + futs.add(compute.future()); + } + + // Wait for all futures to complete. + for (IgniteFuture<?> f : futs) + f.get(); + + System.out.println(); + System.out.println(">>> Finished printing words using runnable execution."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeScheduleExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeScheduleExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeScheduleExample.java new file mode 100644 index 0000000..17812ec --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeScheduleExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.scheduler.*; + +import java.util.concurrent.*; + +/** + * Demonstrates a cron-based {@link Runnable} execution scheduling. + * Test runnable object broadcasts a phrase to all cluster nodes every minute + * three times with initial scheduling delay equal to five seconds. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeScheduleExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute schedule example started."); + + // Schedule output message every minute. + SchedulerFuture<?> fut = ignite.scheduler().scheduleLocal( + new Callable<Integer>() { + private int invocations; + + @Override public Integer call() { + invocations++; + + ignite.compute().broadcast( + new IgniteRunnable() { + @Override public void run() { + System.out.println(); + System.out.println("Howdy! :) "); + } + } + ); + + return invocations; + } + }, + "{5, 3} * * * * *" // Cron expression. + ); + + while (!fut.isDone()) + System.out.println(">>> Invocation #: " + fut.get()); + + System.out.println(); + System.out.println(">>> Schedule future is done and has been unscheduled."); + System.out.println(">>> Check all nodes for hello message output."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java new file mode 100644 index 0000000..a06299f --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Demonstrates a simple use of Ignite with + * {@link ComputeTaskAdapter}. + * <p> + * Phrase passed as task argument is split into words on map stage and distributed among cluster nodes. + * Each node computes word length and returns result to master node where total phrase length is + * calculated on reduce stage. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeTaskMapExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute task map example started."); + + // Execute task on the cluster and wait for its completion. + int cnt = ignite.compute().execute(CharacterCountTask.class, "Hello Ignite Enabled World!"); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } + + /** + * Task to count non-white-space characters in a phrase. + */ + private static class CharacterCountTask extends ComputeTaskAdapter<String, Integer> { + /** + * Splits the received string to words, creates a child job for each word, and sends + * these jobs to other nodes for processing. Each such job simply prints out the received word. + * + * @param nodes Nodes available for this task execution. + * @param arg String to split into words for processing. + * @return Map of jobs to nodes. + */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String arg) { + String[] words = arg.split(" "); + + Map<ComputeJob, ClusterNode> map = new HashMap<>(); + + Iterator<ClusterNode> it = nodes.iterator(); + + for (final String word : arg.split(" ")) { + // If we used all nodes, restart the iterator. + if (!it.hasNext()) + it = nodes.iterator(); + + ClusterNode node = it.next(); + + map.put(new ComputeJobAdapter() { + @Nullable @Override public Object execute() { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + // Return number of letters in the word. + return word.length(); + } + }, node); + } + + return map; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer reduce(List<ComputeJobResult> results) { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskSplitExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskSplitExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskSplitExample.java new file mode 100644 index 0000000..3dd3f63 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskSplitExample.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Demonstrates a simple use of Ignite with {@link ComputeTaskSplitAdapter}. + * <p> + * Phrase passed as task argument is split into jobs each taking one word. Then jobs are distributed among + * cluster nodes. Each node computes word length and returns result to master node where total phrase length + * is calculated on reduce stage. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-ignite.xml} configuration. + */ +public class ComputeTaskSplitExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println("Compute task split example started."); + + // Execute task on the cluster and wait for its completion. + int cnt = ignite.compute().execute(CharacterCountTask.class, "Hello Ignite Enabled World!"); + + System.out.println(); + System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'."); + System.out.println(">>> Check all nodes for output (this node is also part of the cluster)."); + } + } + + /** + * Task to count non-white-space characters in a phrase. + */ + private static class CharacterCountTask extends ComputeTaskSplitAdapter<String, Integer> { + /** + * Splits the received string to words, creates a child job for each word, and sends + * these jobs to other nodes for processing. Each such job simply prints out the received word. + * + * @param clusterSize Number of available cluster nodes. Note that returned number of + * jobs can be less, equal or greater than this cluster size. + * @param arg Task execution argument. Can be {@code null}. + * @return The list of child jobs. + */ + @Override protected Collection<? extends ComputeJob> split(int clusterSize, String arg) { + Collection<ComputeJob> jobs = new LinkedList<>(); + + for (final String word : arg.split(" ")) { + jobs.add(new ComputeJobAdapter() { + @Nullable @Override public Object execute() { + System.out.println(); + System.out.println(">>> Printing '" + word + "' on this node from ignite job."); + + // Return number of letters in the word. + return word.length(); + } + }); + } + + return jobs; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer reduce(List<ComputeJobResult> results) { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b473e309/examples/src/main/java/org/apache/ignite/examples/computegrid/failover/ComputeFailoverExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/failover/ComputeFailoverExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/failover/ComputeFailoverExample.java new file mode 100644 index 0000000..9b87c70 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/failover/ComputeFailoverExample.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.computegrid.failover; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; + +import java.util.*; + +/** + * Demonstrates the usage of checkpoints in Ignite. + * <p> + * The example tries to compute phrase length. In order to mitigate possible node failures, intermediate + * result is saved as as checkpoint after each job step. + * <p> + * Remote nodes must be started using {@link ComputeFailoverNodeStartup}. + */ +public class ComputeFailoverExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException{ + try (Ignite ignite = Ignition.start(ComputeFailoverNodeStartup.configuration())) { + if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2)) + return; + + System.out.println(); + System.out.println("Compute failover example started."); + + // Number of letters. + int charCnt = ignite.compute().apply(new CheckPointJob(), "Stage1 Stage2"); + + System.out.println(); + System.out.println(">>> Finished executing fail-over example with checkpoints."); + System.out.println(">>> Total number of characters in the phrase is '" + charCnt + "'."); + System.out.println(">>> You should see exception stack trace from failed job on some node."); + System.out.println(">>> Failed job will be failed over to another node."); + } + } + + @ComputeTaskSessionFullSupport + private static final class CheckPointJob implements IgniteClosure<String, Integer> { + /** Injected distributed task session. */ + @TaskSessionResource + private ComputeTaskSession jobSes; + + /** Injected ignite logger. */ + @LoggerResource + private IgniteLogger log; + + /** */ + private IgniteBiTuple<Integer, Integer> state; + + /** */ + private String phrase; + + /** + * The job will check the checkpoint with key '{@code fail}' and if + * it's {@code true} it will throw exception to simulate a failure. + * Otherwise, it will execute enabled method. + */ + @Override public Integer apply(String phrase) { + System.out.println(); + System.out.println(">>> Executing fail-over example job."); + + this.phrase = phrase; + + List<String> words = Arrays.asList(phrase.split(" ")); + + final String cpKey = checkpointKey(); + + IgniteBiTuple<Integer, Integer> state = jobSes.loadCheckpoint(cpKey); + + int idx = 0; + int sum = 0; + + if (state != null) { + this.state = state; + + // Last processed word index and total length. + idx = state.get1(); + sum = state.get2(); + } + + for (int i = idx; i < words.size(); i++) { + sum += words.get(i).length(); + + this.state = new IgniteBiTuple<>(i + 1, sum); + + // Save checkpoint with scope of task execution. + // It will be automatically removed when task completes. + jobSes.saveCheckpoint(cpKey, this.state); + + // For example purposes, we fail on purpose after first stage. + // This exception will cause job to be failed over to another node. + if (i == 0) { + System.out.println(); + System.out.println(">>> Job will be failed over to another node."); + + throw new ComputeJobFailoverException("Expected example job exception."); + } + } + + return sum; + } + + /** + * Make reasonably unique checkpoint key. + * + * @return Checkpoint key. + */ + private String checkpointKey() { + return getClass().getName() + '-' + phrase; + } + } +}