Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430 cf9ddc5c5 -> 5f2830daa


ignite-430 IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5f2830da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5f2830da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5f2830da

Branch: refs/heads/ignite-430
Commit: 5f2830daa3558a8c365990de88b61a6bf5b386e8
Parents: cf9ddc5
Author: Andrey Gura <ag...@okko.tv>
Authored: Fri Mar 27 11:23:54 2015 +0300
Committer: Andrey Gura <ag...@okko.tv>
Committed: Fri Mar 27 11:23:54 2015 +0300

----------------------------------------------------------------------
 examples/config/example-cache.xml               |   4 +-
 .../ignite/streaming/IgniteSocketStreamer.java  |   2 +-
 .../apache/ignite/streaming/StreamReceiver.java |  14 ++-
 .../streaming/IgniteTextSocketStreamerTest.java |   5 +-
 .../ignite/streaming/StreamReceiverTest.java    | 104 +++++++++++++++----
 5 files changed, 95 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/examples/config/example-cache.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-cache.xml 
b/examples/config/example-cache.xml
index 0bce70e..706f959 100644
--- a/examples/config/example-cache.xml
+++ b/examples/config/example-cache.xml
@@ -152,8 +152,8 @@
                         to our documentation: 
http://doc.gridgain.org/latest/Automatic+Node+Discovery
                     -->
                     <!-- Uncomment static IP finder to enable static-based 
discovery of initial nodes. -->
-                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
-                    <!--<bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
+                    <!--<bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
+                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with 
actual host IP address. -->

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
index e599cdd..dc53e0f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
@@ -33,7 +33,7 @@ import java.util.*;
  * @param <K> Cache entry key type.
  * @param <V> Cache entry value type.
  */
-public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E,K,V> {
+public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E, K, V> {
     /** Host. */
     private final String host;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java 
b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
index 07d6775..62e3641 100644
--- a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
+++ b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
@@ -28,9 +28,9 @@ import java.util.concurrent.*;
 /**
  * Base implementation of stream receiver.
  *
- * @param <E>
- * @param <K>
- * @param <V>
+ * @param <E> Type of stream element.
+ * @param <K> Type of cache entry key.
+ * @param <V> Type of cache entry value/
  */
 public abstract class StreamReceiver<E, K, V> {
     /** Object monitor. */
@@ -92,7 +92,8 @@ public abstract class StreamReceiver<E, K, V> {
 
             try {
                 stopLatch.await();
-            } catch (InterruptedException e) {
+            }
+            catch (InterruptedException e) {
                 // No-op.
             }
         }
@@ -178,8 +179,11 @@ public abstract class StreamReceiver<E, K, V> {
             finally {
                 if (state == State.STOPPED)
                     fut.onCancelled();
-                else
+                else {
+                    state = State.STOPPED;
+
                     fut.onDone(null, err);
+                }
 
                 stopLatch.countDown();
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
index 7ba2b13..6f9b228 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
@@ -69,14 +69,13 @@ public class IgniteTextSocketStreamerTest extends 
GridCommonAbstractTest {
 
                 fut.get();
 
-                System.out.println(">>> STATE " + sockStmr.state());
-
                 assertTrue(fut.isDone());
                 assertFalse(fut.isCancelled());
             }
 
             assertEquals(ENTRY_CNT, cache.size());
-        } finally {
+        }
+        finally {
             stopAllGrids();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
index fd414c6..660f47e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
@@ -8,35 +8,44 @@ import org.jetbrains.annotations.*;
 import java.util.*;
 
 /**
- *
+ * Tests for {@link StreamReceiver}.
  */
 public class StreamReceiverTest extends TestCase {
-
+    /** Converter. */
     private static final IgniteClosure<Integer, Map.Entry<Integer, String>> 
CONVERTER =
         new IgniteClosure<Integer, Map.Entry<Integer, String>>() {
-        @Override public Map.Entry<Integer, String> apply(Integer input) {
-            return new IgniteBiTuple<>(input, input.toString());
-        }
+            @Override public Map.Entry<Integer, String> apply(Integer input) {
+                return new IgniteBiTuple<>(input, input.toString());
+            }
     };
 
+    /** Stmr. */
     private static final IgniteDataStreamer<Integer, String> STMR = new 
DataStreamerStub<>();
 
-    private volatile boolean finished = false;
-
-    public void testName() throws Exception {
-        StreamReceiver<Integer, Integer, String> receiver =
-            new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) {
-                @Override protected void loadData() {
-                    while (!isStopped() && !finished) {
-                        try {
-                            Thread.sleep(50);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
+    /** Receiver. */
+    private final StreamReceiver<Integer, Integer, String> receiver =
+        new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) {
+            @Override protected void loadData() {
+                while (!isStopped() && !terminatedNormally) {
+                    try {
+                        Thread.sleep(50);
+                    }
+                    catch (InterruptedException e) {
+                        // No-op.
                     }
                 }
-            };
-
+            }
+        };
+
+    /** Terminated normally flag. */
+    private volatile boolean terminatedNormally;
+
+    /**
+     * Tests receiver behavior in case of normal termination.
+     *
+     * @throws Exception If error occurred.
+     */
+    public void testTerminatedNormally() throws Exception {
         assertEquals(StreamReceiver.State.INITIALIZED, receiver.state());
         assertFalse(receiver.isStarted());
         assertFalse(receiver.isStopped());
@@ -65,8 +74,7 @@ public class StreamReceiverTest extends TestCase {
         assertFalse(fut.isDone());
         assertFalse(fut.isCancelled());
 
-        //finished = true;
-        receiver.stop();
+        terminatedNormally = true;
 
         fut.get();
 
@@ -77,9 +85,59 @@ public class StreamReceiverTest extends TestCase {
 
         assertTrue(fut.isDone());
         assertFalse(fut.isCancelled());
+    }
+
+    /**
+     * Tests receiver behavior in case of forced termination.
+     *
+     * @throws Exception If error occurred.
+     */
+    public void testStopped() throws Exception {
+        assertEquals(StreamReceiver.State.INITIALIZED, receiver.state());
+        assertFalse(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        IgniteFuture<Void> fut = receiver.start();
+
+        assertEquals(StreamReceiver.State.STARTED, receiver.state());
+
+        assertTrue(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        assertFalse(fut.isDone());
+        assertFalse(fut.isCancelled());
+
+        try {
+            fut.get(500);
+        }
+        catch (IgniteException e) {
+            // No-op.
+        }
+
+        assertEquals(StreamReceiver.State.STARTED, receiver.state());
+        assertTrue(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        assertFalse(fut.isDone());
+        assertFalse(fut.isCancelled());
+
+        receiver.stop();
 
+        assertEquals(StreamReceiver.State.STOPPED, receiver.state());
+
+        assertFalse(receiver.isStarted());
+        assertTrue(receiver.isStopped());
+
+        assertTrue(fut.isDone());
+        assertTrue(fut.isCancelled());
     }
 
+    /**
+     * Receiver stub.
+     *
+     * @param <K> Key type.
+     * @param <V> Value type.
+     */
     private static class DataStreamerStub<K, V> implements 
IgniteDataStreamer<K, V> {
 
         /** {@inheritDoc} */
@@ -94,7 +152,7 @@ public class StreamReceiverTest extends TestCase {
 
         /** {@inheritDoc} */
         @Override public void allowOverwrite(boolean allowOverwrite) throws 
IgniteException {
-
+            // No-op.
         }
 
         /** {@inheritDoc} */
@@ -169,7 +227,7 @@ public class StreamReceiverTest extends TestCase {
 
         /** {@inheritDoc} */
         @Override public IgniteFuture<?> addData(Collection<? extends 
Map.Entry<K, V>> entries)
-            throws IllegalStateException {
+                throws IllegalStateException {
             return null;
         }
 

Reply via email to