Repository: incubator-ignite Updated Branches: refs/heads/ignite-430 2f1e03bda -> 2077a88e4
ignite-430 IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2077a88e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2077a88e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2077a88e Branch: refs/heads/ignite-430 Commit: 2077a88e4951dc839f96f1b8292a3ed1e416ffee Parents: 2f1e03b Author: Andrey Gura <ag...@gridgain.com> Authored: Mon Mar 30 12:42:22 2015 +0300 Committer: Andrey Gura <ag...@gridgain.com> Committed: Mon Mar 30 12:42:22 2015 +0300 ---------------------------------------------------------------------- .../streaming/SocketStreamerExample.java | 51 +++++++++--------- .../streaming/TextSocketStreamerExample.java | 55 ++++++++++---------- 2 files changed, 52 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2077a88e/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java index 292bc28..5fea93e 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java @@ -18,24 +18,19 @@ package org.apache.ignite.examples.streaming; import org.apache.ignite.*; -import org.apache.ignite.examples.ExampleNodeStartup; -import org.apache.ignite.examples.ExamplesUtils; -import org.apache.ignite.examples.streaming.numbers.CacheConfig; -import org.apache.ignite.examples.streaming.numbers.QueryPopularNumbers; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.streaming.IgniteSocketStreamer; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.Map; -import java.util.Random; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.numbers.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.streaming.*; + +import javax.cache.processor.*; +import java.io.*; +import java.net.*; +import java.util.*; /** - * Stream random numbers into the streaming cache. + * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}. * To start the example, you should: * <ul> * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> @@ -81,6 +76,18 @@ public class SocketStreamerExample { // Allow data updates. stmr.allowOverwrite(true); + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... objects) + throws EntryProcessorException { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + IgniteClosure<IgniteBiTuple<Integer, Long>, Map.Entry<Integer, Long>> converter = new IgniteClosure<IgniteBiTuple<Integer, Long>, Map.Entry<Integer, Long>>() { @Override public Map.Entry<Integer, Long> apply(IgniteBiTuple<Integer, Long> input) { @@ -113,16 +120,8 @@ public class SocketStreamerExample { ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { - while(true) { - oos.writeObject(new IgniteBiTuple<>(RAND.nextInt(RANGE), (long) (RAND.nextInt(RANGE) + 1))); - - try { - Thread.sleep(1); - } - catch (InterruptedException e) { - // No-op. - } - } + while(true) + oos.writeObject(new IgniteBiTuple<>(RAND.nextInt(RANGE), 1L)); } catch (IOException e) { // No-op. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2077a88e/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java index d137d17..4677be1 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java @@ -18,24 +18,19 @@ package org.apache.ignite.examples.streaming; import org.apache.ignite.*; -import org.apache.ignite.examples.ExampleNodeStartup; -import org.apache.ignite.examples.ExamplesUtils; -import org.apache.ignite.examples.streaming.numbers.CacheConfig; -import org.apache.ignite.examples.streaming.numbers.QueryPopularNumbers; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.streaming.IgniteTextSocketStreamer; - -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.Map; -import java.util.Random; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.numbers.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.streaming.*; + +import javax.cache.processor.*; +import java.io.*; +import java.net.*; +import java.util.*; /** - * Stream random numbers into the streaming cache. + * Stream random numbers into the streaming cache using {@link IgniteTextSocketStreamer}. * To start the example, you should: * <ul> * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> @@ -65,6 +60,9 @@ public class TextSocketStreamerExample { * @throws IgniteException If example execution failed. */ public static void main(String[] args) throws IgniteException, InterruptedException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { if (!ExamplesUtils.hasServerNodes(ignite)) return; @@ -78,6 +76,18 @@ public class TextSocketStreamerExample { // Allow data updates. stmr.allowOverwrite(true); + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... objects) + throws EntryProcessorException { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + IgniteClosure<String, Map.Entry<Integer, Long>> converter = new IgniteClosure<String, Map.Entry<Integer, Long>>() { @Override public Map.Entry<Integer, Long> apply(String input) { @@ -113,20 +123,9 @@ public class TextSocketStreamerExample { while(true) { - int key = RAND.nextInt(RANGE); - - int value = RAND.nextInt(RANGE) + 1; - - writer.write(Integer.toString(key) + '=' + Integer.toString(value)); + writer.write(Integer.toString(RAND.nextInt(RANGE)) + "=1"); writer.newLine(); - - try { - Thread.sleep(1); - } - catch (InterruptedException e) { - // No-op. - } } } catch (IOException e) {