# sprint-3 - Updated streaming examples.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b2984c9b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b2984c9b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b2984c9b Branch: refs/heads/ignite-sprint-3 Commit: b2984c9bf4a2563cfd943def1c94d68e9b75aef6 Parents: 5a2b46c Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Mon Apr 6 23:15:55 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Mon Apr 6 23:15:55 2015 -0700 ---------------------------------------------------------------------- examples/README.txt | 11 +- examples/config/example-ignite.xml | 2 +- .../streaming/marketdata/CacheConfig.java | 44 - .../streaming/marketdata/Instrument.java | 106 - .../marketdata/QueryTopInstruments.java | 73 - .../streaming/marketdata/StreamMarketData.java | 103 - .../examples/streaming/numbers/CacheConfig.java | 46 - .../streaming/numbers/QueryPopularNumbers.java | 74 - .../streaming/numbers/StreamRandomNumbers.java | 78 - .../transformers/StreamTransformerExample.java | 101 + .../visitors/StreamVisitorExample.java | 176 + .../streaming/wordcount/CacheConfig.java | 48 + .../streaming/wordcount/QueryWords.java | 77 + .../streaming/wordcount/StreamWords.java | 68 + .../streaming/wordcount/alice-in-wonderland.txt | 3735 ++++++++++++++++++ .../java8/streaming/marketdata/CacheConfig.java | 44 - .../java8/streaming/marketdata/Instrument.java | 106 - .../marketdata/QueryTopInstruments.java | 73 - .../streaming/marketdata/StreamMarketData.java | 101 - .../java8/streaming/numbers/CacheConfig.java | 46 - .../streaming/numbers/QueryPopularNumbers.java | 74 - .../streaming/numbers/StreamRandomNumbers.java | 74 - .../transformers/StreamTransformerExample.java | 97 + .../visitors/StreamVisitorExample.java | 173 + .../ignite/cache/affinity/AffinityUuid.java | 47 + 25 files changed, 4532 insertions(+), 1045 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/README.txt ---------------------------------------------------------------------- diff --git a/examples/README.txt b/examples/README.txt index ce6b238..c75cbb3 100644 --- a/examples/README.txt +++ b/examples/README.txt @@ -1,5 +1,5 @@ Apache Ignite Examples ----------------------- +====================== This folder contains code examples for various Apache Ignite functionality. @@ -18,8 +18,15 @@ The examples folder contains he following subfolders: are excluded by default, enable `java8-examples` Maven profile to include them (JDK8 is required). - `src/main/scala` - contains examples demonstrating usage of API provided by Scalar. + Starting Remote Nodes ---------------------- +===================== Remote nodes for examples should always be started with special configuration file which enables P2P class loading: `examples/config/example-ignite.xml`. To run a remote node in IDE use `ExampleNodeStartup` class. + + +Javay7 vs Java8 +=============== +Some examples (not all) which can benefit from Java8 Lambda support were re-written with Java8 lambdas. +For full set of examples, look at both Java7 and Java8 packages. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/config/example-ignite.xml ---------------------------------------------------------------------- diff --git a/examples/config/example-ignite.xml b/examples/config/example-ignite.xml index e7adb54..e746e59 100644 --- a/examples/config/example-ignite.xml +++ b/examples/config/example-ignite.xml @@ -68,7 +68,7 @@ --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java deleted file mode 100644 index f26ffb7..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.marketdata; - -import org.apache.ignite.configuration.*; - -/** - * Configuration for the streaming caches for market data and financial instruments. - */ -public class CacheConfig { - /** - * Configure streaming cache for market ticks. - */ - public static CacheConfiguration<String, Double> marketTicksCache() { - return new CacheConfiguration<>("marketTicks"); - } - - /** - * Configure cache for financial instruments. - */ - public static CacheConfiguration<String, Instrument> instrumentCache() { - CacheConfiguration<String, Instrument> instCache = new CacheConfiguration<>("instCache"); - - // Index some fields for querying portfolio positions. - instCache.setIndexedTypes(String.class, Instrument.class); - - return instCache; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java deleted file mode 100644 index 6daaffe..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.marketdata; - -import org.apache.ignite.cache.query.annotations.*; - -import java.io.*; - -/** - * Financial instrument. - */ -public class Instrument implements Serializable { - /** Instrument symbol. */ - @QuerySqlField(index = true) - private final String symbol; - - /** Open price. */ - @QuerySqlField(index = true) - private double open; - - /** High price. */ - private double high; - - /** Low price. */ - private double low = Long.MAX_VALUE; - - /** Close price. */ - @QuerySqlField(index = true) - private double latest; - - /** - * @param symbol Symbol. - */ - Instrument(String symbol) { - this.symbol = symbol; - } - - /** - * Updates this instrument based on the latest price. - * - * @param price Latest price. - */ - public void update(double price) { - if (open == 0) - open = price; - - high = Math.max(high, price); - low = Math.min(low, price); - this.latest = price; - } - - /** - * @return Symbol. - */ - public String symbol() { - return symbol; - } - - /** - * @return Open price. - */ - public double open() { - return open; - } - - /** - * @return High price. - */ - public double high() { - return high; - } - - /** - * @return Low price. - */ - public double low() { - return low; - } - - /** - * @return Close price. - */ - public double latest() { - return latest; - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/QueryTopInstruments.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/QueryTopInstruments.java b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/QueryTopInstruments.java deleted file mode 100644 index 4bcb2ba..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/QueryTopInstruments.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.marketdata; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.examples.*; - -import java.util.*; - -/** - * Periodically query popular numbers from the streaming cache. - * To start the example, you should: - * <ul> - * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> - * <li>Start streaming using {@link StreamMarketData}.</li> - * <li>Start querying top performing instruments using {@link QueryTopInstruments}.</li> - * </ul> - * <p> - * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class QueryTopInstruments { - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); - - // Select top 3 instruments. - SqlFieldsQuery top3qry = new SqlFieldsQuery( - "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); - - // Select total profit. - SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - open) from Instrument"); - - // Query top 3 best performing instruments every 5 seconds. - while (true) { - // Execute queries. - List<List<?>> top3 = instCache.query(top3qry).getAll(); - List<List<?>> profit = instCache.query(profitQry).getAll(); - - List<?> row = profit.get(0); - - if (row.get(0) != null) - System.out.printf("Total profit: %.2f%n", row.get(0)); - - // Print top 10 words. - ExamplesUtils.printQueryResults(top3); - - Thread.sleep(5000); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java deleted file mode 100644 index bfac698..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.marketdata; - -import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.stream.*; - -import java.util.*; - -/** - * Stream random numbers into the streaming cache. - * To start the example, you should: - * <ul> - * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> - * <li>Start streaming using {@link StreamMarketData}.</li> - * <li>Start querying top performing instruments using {@link QueryTopInstruments}.</li> - * </ul> - * <p> - * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class StreamMarketData { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** The list of instruments. */ - private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; - - /** The list of initial instrument prices. */ - private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; - - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); - final IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); - - try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) { - // Note that we receive market data, but do not populate 'mktCache' (it remains empty). - // Instead we update the instruments in the 'instCache'. - mktStmr.receiver(new StreamVisitor<String, Double>() { - @Override public void apply(IgniteCache<String, Double> cache, Map.Entry<String, Double> e) { - String symbol = e.getKey(); - Double tick = e.getValue(); - - Instrument inst = instCache.get(symbol); - - if (inst == null) - inst = new Instrument(symbol); - - // Don't populate market cache, as we don't use it for querying. - // Update cached instrument based on the latest market tick. - inst.update(tick); - - instCache.put(symbol, inst); - } - }); - - // Stream market data into market data stream cache. - while (true) { - for (int j = 0; j < INSTRUMENTS.length; j++) { - // Use gaussian distribution to ensure that - // numbers closer to 0 have higher probability. - double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); - - mktStmr.addData(INSTRUMENTS[j], price); - } - } - } - } - } - - /** - * Rounds double value to two significant signs. - * - * @param val value to be rounded. - * @return rounded double value. - */ - private static double round2(double val) { - return Math.floor(100 * val + 0.5) / 100; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/CacheConfig.java deleted file mode 100644 index 58592e1..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/CacheConfig.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.numbers; - -import org.apache.ignite.configuration.*; - -import javax.cache.configuration.*; -import javax.cache.expiry.*; - -import static java.util.concurrent.TimeUnit.*; - -/** - * Configuration for the streaming cache to store the stream of random numbers. - * This cache is configured with sliding window of 1 second, which means that - * data older than 1 second will be automatically removed from the cache. - */ -public class CacheConfig { - /** - * Configure streaming cache. - */ - public static CacheConfiguration<Integer, Long> randomNumbersCache() { - CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers"); - - cfg.setIndexedTypes(Integer.class, Long.class); - - // Sliding window of 1 seconds. - cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/QueryPopularNumbers.java deleted file mode 100644 index e08ae09..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/QueryPopularNumbers.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.numbers; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.examples.*; - -import java.util.*; - -/** - * Periodically query popular numbers from the streaming cache. - * To start the example, you should: - * <ul> - * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> - * <li>Start streaming using {@link StreamRandomNumbers}.</li> - * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> - * </ul> - * <p> - * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class QueryPopularNumbers { - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); - - // Select top 10 words. - SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); - - // Select average, min, and max counts among all the words. - SqlFieldsQuery statsQry = new SqlFieldsQuery("select avg(_val), min(_val), max(_val) from Long"); - - // Query top 10 popular numbers every 5 seconds. - while (true) { - // Execute queries. - List<List<?>> top10 = stmCache.query(top10Qry).getAll(); - List<List<?>> stats = stmCache.query(statsQry).getAll(); - - // Print average count. - List<?> row = stats.get(0); - - if (row.get(0) != null) - System.out.printf("Query results [avg=%.2f, min=%d, max=%d]%n", row.get(0), row.get(1), row.get(2)); - - // Print top 10 words. - ExamplesUtils.printQueryResults(top10); - - Thread.sleep(5000); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java deleted file mode 100644 index 02030fb..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.numbers; - -import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.stream.*; - -import javax.cache.processor.*; -import java.util.*; - -/** - * Stream random numbers into the streaming cache. - * To start the example, you should: - * <ul> - * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> - * <li>Start streaming using {@link StreamRandomNumbers}.</li> - * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> - * </ul> - * <p> - * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class StreamRandomNumbers { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Range within which to generate numbers. */ - private static final int RANGE = 1000; - - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); - - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { - // Allow data updates. - stmr.allowOverwrite(true); - - // Configure data transformation to count instances of the same number. - stmr.receiver(new StreamTransformer<Integer, Long>() { - @Override public Object process(MutableEntry<Integer, Long> e, Object... objects) - throws EntryProcessorException { - Long val = e.getValue(); - - e.setValue(val == null ? 1L : val + 1); - - return null; - } - }); - - // Stream random numbers into the streamer cache. - while (true) - stmr.addData(RAND.nextInt(RANGE), 1L); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java new file mode 100644 index 0000000..b2fc89d --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.transformers; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; + +import javax.cache.processor.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamTransformerExample}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamTransformerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers"); + + // Index key and value. + cfg.setIndexedTypes(Integer.class, Long.class); + + // Auto-close cache at the end of the example. + try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) { + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same number. + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override + public Object process(MutableEntry<Integer, Long> e, Object... args) { + // Get current count. + Long val = e.getValue(); + + // Increment count by 1. + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + + // Stream 10 million of random numbers into the streamer cache. + for (int i = 1; i <= 10_000_000; i++) { + stmr.addData(RAND.nextInt(RANGE), 1L); + + if (i % 500_000 == 0) + System.out.println("Number of tuples streamed into Ignite: " + i); + } + } + + // Query top 10 most popular numbers every. + SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); + + // Execute queries. + List<List<?>> top10 = stmCache.query(top10Qry).getAll(); + + System.out.println("Top 10 most popular numbers:"); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top10); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java new file mode 100644 index 0000000..cd973ea --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.visitors; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; + +import java.io.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamVisitorExample}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamVisitorExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** The list of instruments. */ + private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; + + /** The list of initial instrument prices. */ + private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // Market data cache with default configuration. + CacheConfiguration<String, Double> mktDataCfg = new CacheConfiguration<String, Double>("marketTicks"); + + // Financial instrument cache configuration. + CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache"); + + // Index key and value for querying financial instruments. + // Note that Instrument class has @QuerySqlField annotation for secondary field indexing. + instCfg.setIndexedTypes(String.class, Instrument.class); + + // Auto-close caches at the end of the example. + try ( + IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(mktDataCfg); + IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg) + ) { + try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) { + // Note that we receive market data, but do not populate 'mktCache' (it remains empty). + // Instead we update the instruments in the 'instCache'. + // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated. + mktStmr.receiver(new StreamVisitor<String, Double>() { + @Override + public void apply(IgniteCache<String, Double> cache, Map.Entry<String, Double> e) { + String symbol = e.getKey(); + Double tick = e.getValue(); + + Instrument inst = instCache.get(symbol); + + if (inst == null) + inst = new Instrument(symbol); + + // Don't populate market cache, as we don't use it for querying. + // Update cached instrument based on the latest market tick. + inst.update(tick); + + instCache.put(symbol, inst); + } + }); + + // Stream 10 million market data ticks into the system. + for (int i = 1; i <= 10_000_000; i++) { + int idx = RAND.nextInt(INSTRUMENTS.length); + + // Use gaussian distribution to ensure that + // numbers closer to 0 have higher probability. + double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian()); + + mktStmr.addData(INSTRUMENTS[idx], price); + + if (i % 500_000 == 0) + System.out.println("Number of tuples streamed into Ignite: " + i); + } + } + + // Select top 3 best performing instruments. + SqlFieldsQuery top3qry = new SqlFieldsQuery( + "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); + + // Execute queries. + List<List<?>> top3 = instCache.query(top3qry).getAll(); + + System.out.println("Top performing financial instruments: "); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top3); + } + } + } + + /** + * Rounds double value to two significant signs. + * + * @param val value to be rounded. + * @return rounded double value. + */ + private static double round2(double val) { + return Math.floor(100 * val + 0.5) / 100; + } + + /** + * Financial instrument. + */ + public static class Instrument implements Serializable { + /** Instrument symbol. */ + @QuerySqlField(index = true) + private final String symbol; + + /** Open price. */ + @QuerySqlField(index = true) + private double open; + + /** Close price. */ + @QuerySqlField(index = true) + private double latest; + + /** + * @param symbol Symbol. + */ + Instrument(String symbol) { + this.symbol = symbol; + } + + /** + * Updates this instrument based on the latest price. + * + * @param price Latest price. + */ + public void update(double price) { + if (open == 0) + open = price; + + this.latest = price; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java new file mode 100644 index 0000000..58704ca --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.wordcount; + +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.configuration.*; + +import javax.cache.configuration.*; +import javax.cache.expiry.*; + +import static java.util.concurrent.TimeUnit.*; + +/** + * Configuration for the streaming cache to store the stream of random numbers. + * This cache is configured with sliding window of 1 second, which means that + * data older than 1 second will be automatically removed from the cache. + */ +public class CacheConfig { + /** + * Configure streaming cache. + */ + public static CacheConfiguration<AffinityUuid, String> wordCache() { + CacheConfiguration<AffinityUuid, String> cfg = new CacheConfiguration<>("words"); + + // Index all words streamed into cache. + cfg.setIndexedTypes(AffinityUuid.class, String.class); + + // Sliding window of 1 seconds. + cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java new file mode 100644 index 0000000..c32d8e8 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.wordcount; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.examples.*; + +import java.util.*; + +/** + * Periodically query popular numbers from the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamWords}.</li> + * <li>Start querying popular numbers using {@link QueryWords}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class QueryWords { + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache()); + + // Select top 10 words. + SqlFieldsQuery top10Qry = new SqlFieldsQuery( + "select _val, count(_val) as cnt from String group by _val order by cnt desc limit 10"); + + // Select average, min, and max counts among all the words. + SqlFieldsQuery statsQry = new SqlFieldsQuery( + "select avg(cnt), min(cnt), max(cnt) from (select count(_val) as cnt from String group by _val)"); + + // Query top 10 popular numbers every 5 seconds. + while (true) { + // Execute queries. + List<List<?>> top10 = stmCache.query(top10Qry).getAll(); + List<List<?>> stats = stmCache.query(statsQry).getAll(); + + // Print average count. + List<?> row = stats.get(0); + + if (row.get(0) != null) + System.out.printf("Query results [avg=%.2f, min=%d, max=%d]%n", row.get(0), row.get(1), row.get(2)); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top10); + + Thread.sleep(5000); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java new file mode 100644 index 0000000..c59fa51 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.wordcount; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.examples.*; + +import java.io.*; + +/** + * Stream words into Ignite cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamWords}.</li> + * <li>Start querying popular numbers using {@link QueryWords}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamWords { + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache()); + + try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) { + // Stream words from "alice-in-wonderland" book. + while (true) { + InputStream in = StreamWords.class.getResourceAsStream("alice-in-wonderland.txt"); + + try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) { + for (String line = rdr.readLine(); line != null; line = rdr.readLine()) { + for (String word : line.split(" ")) + if (!word.isEmpty()) + // Stream words into Ignite. + // By using AffinityUuid we ensure that identical + // words are processed on the same cluster node. + stmr.addData(new AffinityUuid(word), word); + } + } + } + } + } + } +}