# ignite-45 - fixing streaming.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f48c968c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f48c968c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f48c968c Branch: refs/heads/ignite-501 Commit: f48c968c43f522a23dd33451a9378e6f7682441a Parents: d49f9c8 Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Thu Mar 19 01:54:17 2015 -0400 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Thu Mar 19 01:54:17 2015 -0400 ---------------------------------------------------------------------- examples/config/example-streamer.xml | 4 ++-- .../examples/streaming/StreamingPopularNumbersExample.java | 5 ++--- .../java8/streaming/numbers/QueryPopularNumbers.java | 9 ++------- .../java8/streaming/numbers/StreamRandomNumbers.java | 3 ++- 4 files changed, 8 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f48c968c/examples/config/example-streamer.xml ---------------------------------------------------------------------- diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml index e6a3a98..490fde8 100644 --- a/examples/config/example-streamer.xml +++ b/examples/config/example-streamer.xml @@ -120,7 +120,7 @@ <list> <bean class="org.apache.ignite.streamer.index.tree.StreamerTreeIndexProvider"> <property name="updater"> - <bean class="org.apache.ignite.examples.java8.streaming.StreamingPopularNumbersExample$IndexUpdater"/> + <bean class="org.apache.ignite.examples.java8.streaming.numbers.QueryPopularNumbers$IndexUpdater"/> </property> </bean> </list> @@ -130,7 +130,7 @@ <property name="stages"> <list> - <bean class="org.apache.ignite.examples.java8.streaming.StreamingPopularNumbersExample$StreamerStage"/> + <bean class="org.apache.ignite.examples.java8.streaming.numbers.QueryPopularNumbers$StreamerStage"/> </list> </property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f48c968c/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java index 29cc2c1..a298932 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java @@ -145,9 +145,8 @@ public class StreamingPopularNumbersExample { stmr.allowOverwrite(true); // Transform data when processing. - stmr.receiver(new StreamTransformer<>(new EntryProcessor<Integer, Long, Object>() { - @Override - public Object process(MutableEntry<Integer, Long> e, Object... args) { + stmr.receiver(new StreamTransformer<>(new CacheEntryProcessor<Integer, Long, Object>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... args) { Long val = e.getValue(); e.setValue(val == null ? 1L : val + 1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f48c968c/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java index 47be047..c531bf1 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java @@ -24,7 +24,7 @@ import org.apache.ignite.examples.java8.*; import java.util.*; /** - * Real time popular numbers counter. + * Periodically query popular numbers from the streaming cache. * <p> * Remote nodes should always be started with special configuration file which * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. @@ -33,18 +33,13 @@ import java.util.*; * start node with {@code examples/config/example-compute.xml} configuration. */ public class QueryPopularNumbers { - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ public static void main(String[] args) throws Exception { // Mark this cluster member as client. Ignition.setClientMode(true); try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { // Start new cache or get existing one. + // The cache is configured with sliding window holding 1 second of the streaming data. try (IgniteCache<Integer, Long> stmCache = ignite.createCache(CacheConfig.configure())) { if (!ExamplesUtils.hasServerNodes(ignite)) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f48c968c/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java index 96472a3..a7dcef7 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java @@ -49,8 +49,9 @@ public class StreamRandomNumbers { // Mark this cluster member as client. Ignition.setClientMode(true); - try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { + try (Ignite ignite = Ignition.start()) { // Create new cache or get existing one. + // The cache is configured with sliding window holding 1 second of the streaming data. try (IgniteCache<Integer, Long> stmCache = ignite.createCache(CacheConfig.configure())) { if (!ExamplesUtils.hasServerNodes(ignite)) return;