Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430 9afb1973e -> cf9ddc5c5


ignite-430 IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cf9ddc5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cf9ddc5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cf9ddc5c

Branch: refs/heads/ignite-430
Commit: cf9ddc5c5f9c682003c14ad95f96f4f82d1a42e9
Parents: 9afb197
Author: Andrey Gura <ag...@gridgain.com>
Authored: Thu Mar 26 20:38:04 2015 +0300
Committer: Andrey Gura <ag...@gridgain.com>
Committed: Thu Mar 26 20:38:04 2015 +0300

----------------------------------------------------------------------
 examples/config/example-cache.xml               |   4 +-
 .../streaming/SocketStreamerExample.java        |  23 +--
 .../streaming/TextSocketStreamerExample.java    |  15 +-
 .../ignite/streaming/IgniteSocketStreamer.java  |  76 +------
 .../streaming/IgniteTextSocketStreamer.java     |  32 ++-
 .../apache/ignite/streaming/StreamReceiver.java | 188 +++++++++++++++++
 .../streaming/IgniteSocketStreamerTest.java     |   9 +-
 .../streaming/IgniteTextSocketStreamerTest.java |  11 +-
 .../ignite/streaming/StreamReceiverTest.java    | 201 +++++++++++++++++++
 9 files changed, 458 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/examples/config/example-cache.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-cache.xml 
b/examples/config/example-cache.xml
index 706f959..0bce70e 100644
--- a/examples/config/example-cache.xml
+++ b/examples/config/example-cache.xml
@@ -152,8 +152,8 @@
                         to our documentation: 
http://doc.gridgain.org/latest/Automatic+Node+Discovery
                     -->
                     <!-- Uncomment static IP finder to enable static-based 
discovery of initial nodes. -->
-                    <!--<bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
-                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                    <!--<bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with 
actual host IP address. -->

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
index 04b7101..8ed4451 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.examples.streaming;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.examples.*;
 import org.apache.ignite.examples.datagrid.*;
 import org.apache.ignite.lang.*;
@@ -91,24 +90,9 @@ public class SocketStreamerExample {
                 IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, 
String> sockStmr =
                     new IgniteSocketStreamer<>(HOST, PORT, stmr, converter);
 
-                //sockStmr.loadData();
-
-                System.out.println(">>> Start streaming async");
-
                 IgniteFuture<Void> fut = sockStmr.start();
 
-                System.out.println(">>> Future obtained");
-
-                try {
-                    fut.get(3000);
-                }
-                catch (IgniteFutureTimeoutException e) {
-                    // No-op.
-                }
-
-                sockStmr.stop();
-
-                System.out.println(">>> Future completed for " + 
fut.duration());
+                fut.get();
             }
 
             long end = System.currentTimeMillis();
@@ -131,11 +115,8 @@ public class SocketStreamerExample {
                      ObjectOutputStream oos =
                          new ObjectOutputStream(new 
BufferedOutputStream(sock.getOutputStream()))) {
 
-                    for (int i = 0; i < ENTRY_COUNT; i++) {
+                    for (int i = 0; i < ENTRY_COUNT; i++)
                         oos.writeObject(new IgniteBiTuple<>(i, 
Integer.toString(i)));
-                        if (i > 0 && i % 1000 == 0)
-                            System.out.println(">>> " + i + " events sent.");
-                    }
                 }
                 catch (IOException e) {
                     // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
index d6c6daa..49de6cb 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
@@ -92,7 +92,20 @@ public class TextSocketStreamerExample {
                 IgniteTextSocketStreamer<Integer, String> sockStmr =
                     new IgniteTextSocketStreamer<>(HOST, PORT, stmr, 
converter);
 
-                sockStmr.loadData();
+                IgniteFuture<Void> fut = sockStmr.start();
+
+                try {
+                    fut.get(500);
+                } catch (IgniteFutureTimeoutException e) {
+                    // No-op.
+                }
+
+                //fut.get();
+
+                sockStmr.stop();
+
+                System.out.println(">>> Future done: " + fut.isDone());
+                System.out.println(">>> Future canceled: " + 
fut.isCancelled());
             }
 
             long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
index cce4570..e599cdd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
@@ -18,13 +18,8 @@
 package org.apache.ignite.streaming;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.logger.*;
-import org.apache.ignite.thread.*;
 
 import java.io.*;
 import java.net.*;
@@ -38,24 +33,13 @@ import java.util.*;
  * @param <K> Cache entry key type.
  * @param <V> Cache entry value type.
  */
-public class IgniteSocketStreamer<E, K, V> {
-    /** Logger. */
-    private static final IgniteLogger log = new NullLogger();
-
+public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E,K,V> {
     /** Host. */
     private final String host;
 
     /** Port. */
     private final int port;
 
-    private volatile GridWorker worker;
-
-    /** Target streamer. */
-    protected final IgniteDataStreamer<K, V> streamer;
-
-    /** Stream to entries iterator transformer. */
-    protected final IgniteClosure<E, Map.Entry<K, V>> converter;
-
     /**
      * Constructs socket streamer.
      *
@@ -70,20 +54,16 @@ public class IgniteSocketStreamer<E, K, V> {
         IgniteDataStreamer<K, V> streamer,
         IgniteClosure<E, Map.Entry<K, V>> converter
     ) {
-        A.notNull(streamer, "streamer is null");
+        super(streamer, converter);
+
         A.notNull(host, "host is null");
-        A.notNull(converter, "converter is null");
 
         this.host = host;
         this.port = port;
-        this.streamer = streamer;
-        this.converter = converter;
     }
 
-    /**
-     * Performs loading of data stream.
-     */
-    public void loadData() {
+    /** {@inheritDoc} */
+    @Override protected void loadData() {
         try (Socket sock = new Socket(host, port)) {
             loadData(sock);
         }
@@ -92,41 +72,19 @@ public class IgniteSocketStreamer<E, K, V> {
         }
     }
 
-    public IgniteFuture<Void> start() {
-
-        final GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
-
-        ReceiverWorker worker = new ReceiverWorker(
-            "GRID???", "Socket streamer receiver", log, new 
GridWorkerListenerAdapter() {
-            @Override public void onStopped(GridWorker w) {
-                fut.onDone();
-            }
-        });
-
-        this.worker = worker;
-
-        new IgniteThread(worker).start();
-
-        return new IgniteFutureImpl<>(fut);
-    }
-
-    public void stop() {
-        worker.cancel();
-    }
-
     /**
      * Reads data from socket and loads them into target data stream.
      *
      * @param sock Socket.
      */
     @SuppressWarnings("unchecked")
-    protected void loadData(Socket sock) throws IOException {
+    private void loadData(Socket sock) throws IOException {
         try (ObjectInputStream ois = new ObjectInputStream(new 
BufferedInputStream(sock.getInputStream()))) {
-            while (true) {
+            while (!isStopped()) {
                 try {
                     E element = (E) ois.readObject();
 
-                    streamer.addData(converter.apply(element));
+                    addData(element);
                 }
                 catch (EOFException e) {
                     break;
@@ -137,22 +95,4 @@ public class IgniteSocketStreamer<E, K, V> {
             }
         }
     }
-
-    private class ReceiverWorker extends GridWorker {
-
-        private final GridWorkerListener lsnr;
-
-        protected ReceiverWorker(String gridName, String name, IgniteLogger 
log, GridWorkerListener lsnr) {
-            super(gridName, name, log, lsnr);
-            this.lsnr = lsnr;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            lsnr.onStarted(this);
-            loadData();
-            lsnr.onStopped(this);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
index 6faaeb6..a43d26b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.streaming;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 
 import java.io.*;
@@ -31,7 +32,13 @@ import java.util.*;
  * @param <K> Cache entry key type.
  * @param <V> Cache entry value type.
  */
-public class IgniteTextSocketStreamer<K, V> extends 
IgniteSocketStreamer<String, K, V> {
+public class IgniteTextSocketStreamer<K, V> extends StreamReceiver<String, K, 
V> {
+    /** Host. */
+    private final String host;
+
+    /** Port. */
+    private final int port;
+
     /**
      * Constructs text socket streamer.
      *
@@ -46,16 +53,31 @@ public class IgniteTextSocketStreamer<K, V> extends 
IgniteSocketStreamer<String,
         IgniteDataStreamer<K, V> streamer,
         IgniteClosure<String, Map.Entry<K, V>> converter
     ) {
-        super(host, port, streamer, converter);
+        super(streamer, converter);
+
+        A.notNull(host, "host is null");
+
+        this.host = host;
+        this.port = port;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void loadData() {
+        try (Socket sock = new Socket(host, port)) {
+            loadData(sock);
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
     }
 
     /** {@inheritDoc} */
-    @Override protected void loadData(Socket sock) throws IOException {
+    private void loadData(Socket sock) throws IOException {
         try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(sock.getInputStream(), "UTF-8"))) {
             String val;
 
-            while ((val = reader.readLine()) != null)
-                streamer.addData(converter.apply(val));
+            while (!isStopped() && (val = reader.readLine()) != null)
+                addData(val);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java 
b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
new file mode 100644
index 0000000..07d6775
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Base implementation of stream receiver.
+ *
+ * @param <E>
+ * @param <K>
+ * @param <V>
+ */
+public abstract class StreamReceiver<E, K, V> {
+    /** Object monitor. */
+    private final Object lock = new Object();
+
+    /** Stop latch. */
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    /** State. */
+    private volatile State state = State.INITIALIZED;
+
+    /** Target streamer. */
+    private final IgniteDataStreamer<K, V> streamer;
+
+    /** Element to entries transformer. */
+    private final IgniteClosure<E, Map.Entry<K, V>> converter;
+
+    /**
+     * Constructs stream receiver.
+     *
+     * @param streamer Streamer.
+     * @param converter Element to entries transformer.
+     */
+    public StreamReceiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, 
Map.Entry<K, V>> converter) {
+        A.notNull(streamer, "streamer is null");
+        A.notNull(converter, "converter is null");
+
+        this.streamer = streamer;
+        this.converter = converter;
+    }
+
+    /**
+     * Starts streamer.
+     */
+    public IgniteFuture<Void> start() {
+        synchronized (lock) {
+            if (state != State.INITIALIZED)
+                throw new IllegalStateException("Receiver in " + state + " 
state can't be started.");
+
+            GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+            new Thread(new Receiver(fut)).start();
+
+            state = State.STARTED;
+
+            return new IgniteFutureImpl<>(fut);
+        }
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        synchronized (lock) {
+            if (state != State.STARTED)
+                throw new IllegalStateException("Receiver in " + state + " 
state can't be stopped.");
+
+            state = State.STOPPED;
+
+            try {
+                stopLatch.await();
+            } catch (InterruptedException e) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
+     * Returns stream receiver state.
+     *
+     * @return stream receiver state.
+     */
+    public State state() {
+        return state;
+    }
+
+    /**
+     * Checks whether receiver is started or not.
+     *
+     * @return {@code True} if receiver is started, {@code false} - otherwise.
+     */
+    public boolean isStarted() {
+        return state == State.STARTED;
+    }
+
+    /**
+     * Checks whether receiver is stopped or not.
+     *
+     * @return {@code True} if receiver is stopped, {@code false} - otherwise.
+     */
+    public boolean isStopped() {
+        return state == State.STOPPED;
+    }
+
+    /**
+     * Performs actual loading of data. Override this method in order to 
implement own data loading functionality.
+     */
+    protected abstract void loadData();
+
+    /**
+     * Convert stream data to cache entry and transfer it to the target 
streamer.
+     *
+     * @param element Element.
+     */
+    protected void addData(E element) {
+        streamer.addData(converter.apply(element));
+    }
+
+    /**
+     * Receiver state.
+     */
+    public enum State {
+        /** New. */
+        INITIALIZED,
+        /** Started. */
+        STARTED,
+        /** Stopped. */
+        STOPPED
+    }
+
+    /**
+     * Receiver worker that actually receives data from socket.
+     */
+    private class Receiver implements Runnable {
+        /** Future. */
+        private final GridFutureAdapter<Void> fut;
+
+        /**
+         * @param fut Future.
+         */
+        public Receiver(GridFutureAdapter<Void> fut) {
+            this.fut = fut;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            Throwable err = null;
+
+            try {
+                loadData();
+            }
+            catch (Throwable e) {
+                err = e;
+            }
+            finally {
+                if (state == State.STOPPED)
+                    fut.onCancelled();
+                else
+                    fut.onDone(null, err);
+
+                stopLatch.countDown();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
index 1bb0088..a88c214 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
@@ -60,7 +60,7 @@ public class IgniteSocketStreamerTest extends 
GridCommonAbstractTest {
     /**
      * Tests data loading.
      */
-    public void testLoadData() throws Exception {
+    public void testStreamer() throws Exception {
         try (Ignite g = startGrid()) {
 
             IgniteCache<Integer, String> cache = g.jcache(null);
@@ -81,7 +81,12 @@ public class IgniteSocketStreamerTest extends 
GridCommonAbstractTest {
                 IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, 
String> sockStmr =
                     new IgniteSocketStreamer<>(HOST, PORT, stmr, converter);
 
-                sockStmr.loadData();
+                IgniteFuture<Void> fut = sockStmr.start();
+
+                fut.get();
+
+                assertTrue(fut.isDone());
+                assertFalse(fut.isCancelled());
             }
 
             assertEquals(ENTRY_CNT, cache.size());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
index 34318c8..7ba2b13 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
@@ -43,7 +43,7 @@ public class IgniteTextSocketStreamerTest extends 
GridCommonAbstractTest {
     /**
      * Tests data loading.
      */
-    public void testLoadData() throws Exception {
+    public void testStream() throws Exception {
         try (Ignite g = startGrid()) {
 
             IgniteCache<Integer, String> cache = g.jcache(null);
@@ -65,7 +65,14 @@ public class IgniteTextSocketStreamerTest extends 
GridCommonAbstractTest {
                 IgniteTextSocketStreamer<Integer, String> sockStmr =
                     new IgniteTextSocketStreamer<>(HOST, PORT, stmr, 
converter);
 
-                sockStmr.loadData();
+                IgniteFuture<Void> fut = sockStmr.start();
+
+                fut.get();
+
+                System.out.println(">>> STATE " + sockStmr.state());
+
+                assertTrue(fut.isDone());
+                assertFalse(fut.isCancelled());
             }
 
             assertEquals(ENTRY_CNT, cache.size());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
new file mode 100644
index 0000000..fd414c6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
@@ -0,0 +1,201 @@
+package org.apache.ignite.streaming;
+
+import junit.framework.TestCase;
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StreamReceiverTest extends TestCase {
+
+    private static final IgniteClosure<Integer, Map.Entry<Integer, String>> 
CONVERTER =
+        new IgniteClosure<Integer, Map.Entry<Integer, String>>() {
+        @Override public Map.Entry<Integer, String> apply(Integer input) {
+            return new IgniteBiTuple<>(input, input.toString());
+        }
+    };
+
+    private static final IgniteDataStreamer<Integer, String> STMR = new 
DataStreamerStub<>();
+
+    private volatile boolean finished = false;
+
+    public void testName() throws Exception {
+        StreamReceiver<Integer, Integer, String> receiver =
+            new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) {
+                @Override protected void loadData() {
+                    while (!isStopped() && !finished) {
+                        try {
+                            Thread.sleep(50);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            };
+
+        assertEquals(StreamReceiver.State.INITIALIZED, receiver.state());
+        assertFalse(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        IgniteFuture<Void> fut = receiver.start();
+
+        assertEquals(StreamReceiver.State.STARTED, receiver.state());
+
+        assertTrue(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        assertFalse(fut.isDone());
+        assertFalse(fut.isCancelled());
+
+        try {
+            fut.get(500);
+        }
+        catch (IgniteException e) {
+            // No-op.
+        }
+
+        assertEquals(StreamReceiver.State.STARTED, receiver.state());
+        assertTrue(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        assertFalse(fut.isDone());
+        assertFalse(fut.isCancelled());
+
+        //finished = true;
+        receiver.stop();
+
+        fut.get();
+
+        assertEquals(StreamReceiver.State.STOPPED, receiver.state());
+
+        assertFalse(receiver.isStarted());
+        assertTrue(receiver.isStopped());
+
+        assertTrue(fut.isDone());
+        assertFalse(fut.isCancelled());
+
+    }
+
+    private static class DataStreamerStub<K, V> implements 
IgniteDataStreamer<K, V> {
+
+        /** {@inheritDoc} */
+        @Override public String cacheName() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean allowOverwrite() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void allowOverwrite(boolean allowOverwrite) throws 
IgniteException {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipStore() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void skipStore(boolean skipStore) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int perNodeBufferSize() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void perNodeBufferSize(int bufSize) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int perNodeParallelOperations() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void perNodeParallelOperations(int parallelOps) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public long autoFlushFrequency() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void autoFlushFrequency(long autoFlushFreq) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> future() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void deployClass(Class<?> depCls) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void updater(Updater<K, V> updater) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> removeData(K key) throws 
IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(K key, @Nullable V val) 
throws IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws 
IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Collection<? extends 
Map.Entry<K, V>> entries)
+            throws IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Map<K, V> entries) throws 
IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void flush() throws IgniteException, 
IllegalStateException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void tryFlush() throws IgniteException, 
IllegalStateException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close(boolean cancel) throws IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteException {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file

Reply via email to