Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430 2f1e03bda -> 2077a88e4


ignite-430 IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2077a88e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2077a88e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2077a88e

Branch: refs/heads/ignite-430
Commit: 2077a88e4951dc839f96f1b8292a3ed1e416ffee
Parents: 2f1e03b
Author: Andrey Gura <ag...@gridgain.com>
Authored: Mon Mar 30 12:42:22 2015 +0300
Committer: Andrey Gura <ag...@gridgain.com>
Committed: Mon Mar 30 12:42:22 2015 +0300

----------------------------------------------------------------------
 .../streaming/SocketStreamerExample.java        | 51 +++++++++---------
 .../streaming/TextSocketStreamerExample.java    | 55 ++++++++++----------
 2 files changed, 52 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2077a88e/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
index 292bc28..5fea93e 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
@@ -18,24 +18,19 @@
 package org.apache.ignite.examples.streaming;
 
 import org.apache.ignite.*;
-import org.apache.ignite.examples.ExampleNodeStartup;
-import org.apache.ignite.examples.ExamplesUtils;
-import org.apache.ignite.examples.streaming.numbers.CacheConfig;
-import org.apache.ignite.examples.streaming.numbers.QueryPopularNumbers;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.streaming.IgniteSocketStreamer;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Map;
-import java.util.Random;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.streaming.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
 
 /**
- * Stream random numbers into the streaming cache.
+ * Stream random numbers into the streaming cache using {@link 
IgniteSocketStreamer}.
  * To start the example, you should:
  * <ul>
  *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
@@ -81,6 +76,18 @@ public class SocketStreamerExample {
                 // Allow data updates.
                 stmr.allowOverwrite(true);
 
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, 
Long> e, Object... objects)
+                        throws EntryProcessorException {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+
                 IgniteClosure<IgniteBiTuple<Integer, Long>, Map.Entry<Integer, 
Long>> converter =
                     new IgniteClosure<IgniteBiTuple<Integer, Long>, 
Map.Entry<Integer, Long>>() {
                         @Override public Map.Entry<Integer, Long> 
apply(IgniteBiTuple<Integer, Long> input) {
@@ -113,16 +120,8 @@ public class SocketStreamerExample {
                      ObjectOutputStream oos =
                          new ObjectOutputStream(new 
BufferedOutputStream(sock.getOutputStream()))) {
 
-                    while(true) {
-                        oos.writeObject(new 
IgniteBiTuple<>(RAND.nextInt(RANGE), (long) (RAND.nextInt(RANGE) + 1)));
-
-                        try {
-                            Thread.sleep(1);
-                        }
-                        catch (InterruptedException e) {
-                            // No-op.
-                        }
-                    }
+                    while(true)
+                        oos.writeObject(new 
IgniteBiTuple<>(RAND.nextInt(RANGE), 1L));
                 }
                 catch (IOException e) {
                     // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2077a88e/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
index d137d17..4677be1 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
@@ -18,24 +18,19 @@
 package org.apache.ignite.examples.streaming;
 
 import org.apache.ignite.*;
-import org.apache.ignite.examples.ExampleNodeStartup;
-import org.apache.ignite.examples.ExamplesUtils;
-import org.apache.ignite.examples.streaming.numbers.CacheConfig;
-import org.apache.ignite.examples.streaming.numbers.QueryPopularNumbers;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.streaming.IgniteTextSocketStreamer;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Map;
-import java.util.Random;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.streaming.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
 
 /**
- * Stream random numbers into the streaming cache.
+ * Stream random numbers into the streaming cache using {@link 
IgniteTextSocketStreamer}.
  * To start the example, you should:
  * <ul>
  *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
@@ -65,6 +60,9 @@ public class TextSocketStreamerExample {
      * @throws IgniteException If example execution failed.
      */
     public static void main(String[] args) throws IgniteException, 
InterruptedException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
         try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
             if (!ExamplesUtils.hasServerNodes(ignite))
                 return;
@@ -78,6 +76,18 @@ public class TextSocketStreamerExample {
                 // Allow data updates.
                 stmr.allowOverwrite(true);
 
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, 
Long> e, Object... objects)
+                        throws EntryProcessorException {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+
                 IgniteClosure<String, Map.Entry<Integer, Long>> converter =
                     new IgniteClosure<String, Map.Entry<Integer, Long>>() {
                         @Override public Map.Entry<Integer, Long> apply(String 
input) {
@@ -113,20 +123,9 @@ public class TextSocketStreamerExample {
 
 
                     while(true) {
-                        int key = RAND.nextInt(RANGE);
-
-                        int value = RAND.nextInt(RANGE) + 1;
-
-                        writer.write(Integer.toString(key) + '=' + 
Integer.toString(value));
+                        writer.write(Integer.toString(RAND.nextInt(RANGE)) + 
"=1");
 
                         writer.newLine();
-
-                        try {
-                            Thread.sleep(1);
-                        }
-                        catch (InterruptedException e) {
-                            // No-op.
-                        }
                     }
                 }
                 catch (IOException e) {

Reply via email to