Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430 a30ec69a7 -> 9afb1973e


ignite-430 IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9afb1973
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9afb1973
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9afb1973

Branch: refs/heads/ignite-430
Commit: 9afb1973edade69b359527100268457bdb65124d
Parents: a30ec69
Author: Andrey Gura <ag...@gridgain.com>
Authored: Wed Mar 25 21:16:16 2015 +0300
Committer: Andrey Gura <ag...@gridgain.com>
Committed: Wed Mar 25 21:16:16 2015 +0300

----------------------------------------------------------------------
 .../streaming/SocketStreamerExample.java        | 24 +++++++++-
 .../ignite/streaming/IgniteSocketStreamer.java  | 49 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9afb1973/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
index 234ef76..04b7101 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
@@ -91,7 +91,24 @@ public class SocketStreamerExample {
                 IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, 
String> sockStmr =
                     new IgniteSocketStreamer<>(HOST, PORT, stmr, converter);
 
-                sockStmr.loadData();
+                //sockStmr.loadData();
+
+                System.out.println(">>> Start streaming async");
+
+                IgniteFuture<Void> fut = sockStmr.start();
+
+                System.out.println(">>> Future obtained");
+
+                try {
+                    fut.get(3000);
+                }
+                catch (IgniteFutureTimeoutException e) {
+                    // No-op.
+                }
+
+                sockStmr.stop();
+
+                System.out.println(">>> Future completed for " + 
fut.duration());
             }
 
             long end = System.currentTimeMillis();
@@ -114,8 +131,11 @@ public class SocketStreamerExample {
                      ObjectOutputStream oos =
                          new ObjectOutputStream(new 
BufferedOutputStream(sock.getOutputStream()))) {
 
-                    for (int i = 0; i < ENTRY_COUNT; i++)
+                    for (int i = 0; i < ENTRY_COUNT; i++) {
                         oos.writeObject(new IgniteBiTuple<>(i, 
Integer.toString(i)));
+                        if (i > 0 && i % 1000 == 0)
+                            System.out.println(">>> " + i + " events sent.");
+                    }
                 }
                 catch (IOException e) {
                     // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9afb1973/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
index 54dcdce..cce4570 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
@@ -18,9 +18,13 @@
 package org.apache.ignite.streaming;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.logger.*;
+import org.apache.ignite.thread.*;
 
 import java.io.*;
 import java.net.*;
@@ -35,12 +39,17 @@ import java.util.*;
  * @param <V> Cache entry value type.
  */
 public class IgniteSocketStreamer<E, K, V> {
+    /** Logger. */
+    private static final IgniteLogger log = new NullLogger();
+
     /** Host. */
     private final String host;
 
     /** Port. */
     private final int port;
 
+    private volatile GridWorker worker;
+
     /** Target streamer. */
     protected final IgniteDataStreamer<K, V> streamer;
 
@@ -83,6 +92,28 @@ public class IgniteSocketStreamer<E, K, V> {
         }
     }
 
+    public IgniteFuture<Void> start() {
+
+        final GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+        ReceiverWorker worker = new ReceiverWorker(
+            "GRID???", "Socket streamer receiver", log, new 
GridWorkerListenerAdapter() {
+            @Override public void onStopped(GridWorker w) {
+                fut.onDone();
+            }
+        });
+
+        this.worker = worker;
+
+        new IgniteThread(worker).start();
+
+        return new IgniteFutureImpl<>(fut);
+    }
+
+    public void stop() {
+        worker.cancel();
+    }
+
     /**
      * Reads data from socket and loads them into target data stream.
      *
@@ -106,4 +137,22 @@ public class IgniteSocketStreamer<E, K, V> {
             }
         }
     }
+
+    private class ReceiverWorker extends GridWorker {
+
+        private final GridWorkerListener lsnr;
+
+        protected ReceiverWorker(String gridName, String name, IgniteLogger 
log, GridWorkerListener lsnr) {
+            super(gridName, name, log, lsnr);
+            this.lsnr = lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            lsnr.onStarted(this);
+            loadData();
+            lsnr.onStopped(this);
+        }
+    }
+
 }

Reply via email to