Repository: incubator-ignite Updated Branches: refs/heads/ignite-430 a30ec69a7 -> 9afb1973e
ignite-430 IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9afb1973 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9afb1973 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9afb1973 Branch: refs/heads/ignite-430 Commit: 9afb1973edade69b359527100268457bdb65124d Parents: a30ec69 Author: Andrey Gura <ag...@gridgain.com> Authored: Wed Mar 25 21:16:16 2015 +0300 Committer: Andrey Gura <ag...@gridgain.com> Committed: Wed Mar 25 21:16:16 2015 +0300 ---------------------------------------------------------------------- .../streaming/SocketStreamerExample.java | 24 +++++++++- .../ignite/streaming/IgniteSocketStreamer.java | 49 ++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9afb1973/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java index 234ef76..04b7101 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java @@ -91,7 +91,24 @@ public class SocketStreamerExample { IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = new IgniteSocketStreamer<>(HOST, PORT, stmr, converter); - sockStmr.loadData(); + //sockStmr.loadData(); + + System.out.println(">>> Start streaming async"); + + IgniteFuture<Void> fut = sockStmr.start(); + + System.out.println(">>> Future obtained"); + + try { + fut.get(3000); + } + catch (IgniteFutureTimeoutException e) { + // No-op. + } + + sockStmr.stop(); + + System.out.println(">>> Future completed for " + fut.duration()); } long end = System.currentTimeMillis(); @@ -114,8 +131,11 @@ public class SocketStreamerExample { ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { - for (int i = 0; i < ENTRY_COUNT; i++) + for (int i = 0; i < ENTRY_COUNT; i++) { oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i))); + if (i > 0 && i % 1000 == 0) + System.out.println(">>> " + i + " events sent."); + } } catch (IOException e) { // No-op. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9afb1973/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java index 54dcdce..cce4570 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java @@ -18,9 +18,13 @@ package org.apache.ignite.streaming; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; +import org.apache.ignite.logger.*; +import org.apache.ignite.thread.*; import java.io.*; import java.net.*; @@ -35,12 +39,17 @@ import java.util.*; * @param <V> Cache entry value type. */ public class IgniteSocketStreamer<E, K, V> { + /** Logger. */ + private static final IgniteLogger log = new NullLogger(); + /** Host. */ private final String host; /** Port. */ private final int port; + private volatile GridWorker worker; + /** Target streamer. */ protected final IgniteDataStreamer<K, V> streamer; @@ -83,6 +92,28 @@ public class IgniteSocketStreamer<E, K, V> { } } + public IgniteFuture<Void> start() { + + final GridFutureAdapter<Void> fut = new GridFutureAdapter<>(); + + ReceiverWorker worker = new ReceiverWorker( + "GRID???", "Socket streamer receiver", log, new GridWorkerListenerAdapter() { + @Override public void onStopped(GridWorker w) { + fut.onDone(); + } + }); + + this.worker = worker; + + new IgniteThread(worker).start(); + + return new IgniteFutureImpl<>(fut); + } + + public void stop() { + worker.cancel(); + } + /** * Reads data from socket and loads them into target data stream. * @@ -106,4 +137,22 @@ public class IgniteSocketStreamer<E, K, V> { } } } + + private class ReceiverWorker extends GridWorker { + + private final GridWorkerListener lsnr; + + protected ReceiverWorker(String gridName, String name, IgniteLogger log, GridWorkerListener lsnr) { + super(gridName, name, log, lsnr); + this.lsnr = lsnr; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + lsnr.onStarted(this); + loadData(); + lsnr.onStopped(this); + } + } + }