Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 55fd133db -> f1a46c4ad
# ignite-45 - fixing streaming. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6482ae56 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6482ae56 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6482ae56 Branch: refs/heads/ignite-45 Commit: 6482ae56b731df62890cc984f85e01cb8503bbff Parents: ad1a0ab Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Fri Mar 20 00:17:42 2015 -0400 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Fri Mar 20 00:17:42 2015 -0400 ---------------------------------------------------------------------- .../ignite/examples/java8/ExamplesUtils.java | 2 +- .../streaming/numbers/QueryPopularNumbers.java | 27 ++++++++-------- .../streaming/numbers/StreamRandomNumbers.java | 33 ++++++++++---------- 3 files changed, 30 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6482ae56/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java index 5854cdd..7b36fd9 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java @@ -101,7 +101,7 @@ public class ExamplesUtils { * @param res Query results. */ public static void printQueryResults(List<?> res) { - if (res == null) + if (res == null || res.isEmpty()) System.out.println("Query result set is empty."); else { System.out.println("Query results:"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6482ae56/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java index 9895d2e..c86eb37 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java @@ -44,25 +44,24 @@ public class QueryPopularNumbers { Ignition.setClientMode(true); try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { - // Start new cache or get existing one. + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + // The cache is configured with sliding window holding 1 second of the streaming data. - try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure())) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure()); - // Query top 10 popular numbers every 5 seconds. - while (true) { - // Select top 10 words. - SqlFieldsQuery top10 = new SqlFieldsQuery( - "select _key, _val from Long order by _val desc limit 10"); + // Query top 10 popular numbers every 5 seconds. + while (true) { + // Select top 10 words. + SqlFieldsQuery top10 = new SqlFieldsQuery( + "select _key, _val from Long order by _val desc limit 10"); - // Execute query. - List<List<?>> results = stmCache.queryFields(top10).getAll(); + // Execute query. + List<List<?>> results = stmCache.queryFields(top10).getAll(); - ExamplesUtils.printQueryResults(results); + ExamplesUtils.printQueryResults(results); - Thread.sleep(5000); - } + Thread.sleep(5000); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6482ae56/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java index fd2587f..eae5272 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java @@ -50,29 +50,28 @@ public class StreamRandomNumbers { Ignition.setClientMode(true); try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { - // Create new cache or get existing one. + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + // The cache is configured with sliding window holding 1 second of the streaming data. - try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure())) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure()); - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { - // Allow data updates. - stmr.allowOverwrite(true); + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); - // Configure data transformation to count instances of the same word. - stmr.receiver(new StreamTransformer<>((e, arg) -> { - Long val = e.getValue(); + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<>((e, arg) -> { + Long val = e.getValue(); - e.setValue(val == null ? 1L : val + 1); + e.setValue(val == null ? 1L : val + 1); - return null; - })); + return null; + })); - // Stream random numbers into the streamer cache. - while (true) - stmr.addData(RAND.nextInt(RANGE), 1L); - } + // Stream random numbers into the streamer cache. + while (true) + stmr.addData(RAND.nextInt(RANGE), 1L); } } }