// Waiting for all data streamed.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ea1d6213
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ea1d6213
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ea1d6213

Branch: refs/heads/ignite-430
Commit: ea1d6213217a48ac674d1b175b5a849016568dbd
Parents: e0b86cd
Author: Andrey Gura <ag...@okko.tv>
Authored: Sun Mar 29 18:19:18 2015 +0300
Committer: Andrey Gura <ag...@okko.tv>
Committed: Sun Mar 29 18:19:18 2015 +0300

----------------------------------------------------------------------
 .../streaming/SocketStreamerExample.java        |  9 +--
 .../streaming/TextSocketStreamerExample.java    | 26 +++----
 .../apache/ignite/streaming/StreamReceiver.java | 31 +-------
 .../apache/ignite/streaming/package-info.java   | 21 +++++
 .../streaming/IgniteSocketStreamerTest.java     | 27 +++++--
 .../streaming/IgniteTextSocketStreamerTest.java | 30 +++++--
 .../ignite/streaming/StreamReceiverTest.java    | 82 ++------------------
 pom.xml                                         |  4 +-
 8 files changed, 89 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
index 8ed4451..cf24455 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
@@ -19,7 +19,6 @@ package org.apache.ignite.examples.streaming;
 
 import org.apache.ignite.*;
 import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.datagrid.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.streaming.*;
 
@@ -33,7 +32,7 @@ import java.util.*;
  * Remote nodes should always be started with special configuration file which
  * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-cache.xml'}.
  * <p>
- * Alternatively you can run {@link CacheNodeStartup} in another JVM which will
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
  * start node with {@code examples/config/example-cache.xml} configuration.
  */
 public class SocketStreamerExample {
@@ -68,7 +67,7 @@ public class SocketStreamerExample {
             startServer();
 
             // Clean up caches on all nodes before run.
-            ignite.jcache(CACHE_NAME).clear();
+            ignite.cache(CACHE_NAME).clear();
 
             System.out.println();
             System.out.println(">>> Cache clear finished.");
@@ -90,9 +89,7 @@ public class SocketStreamerExample {
                 IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, 
String> sockStmr =
                     new IgniteSocketStreamer<>(HOST, PORT, stmr, converter);
 
-                IgniteFuture<Void> fut = sockStmr.start();
-
-                fut.get();
+                sockStmr.start();
             }
 
             long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
index 49de6cb..6731a3c 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
@@ -20,7 +20,6 @@ package org.apache.ignite.examples.streaming;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.datagrid.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.streaming.*;
 
@@ -34,13 +33,10 @@ import java.util.*;
  * Remote nodes should always be started with special configuration file which
  * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-cache.xml'}.
  * <p>
- * Alternatively you can run {@link CacheNodeStartup} in another JVM which will
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
  * start node with {@code examples/config/example-cache.xml} configuration.
  */
 public class TextSocketStreamerExample {
-    /** Cache name. */
-    private static final String CACHE_NAME = "partitioned";
-
     /** Number of entries to load. */
     private static final int ENTRY_COUNT = 500000;
 
@@ -69,14 +65,14 @@ public class TextSocketStreamerExample {
             startServer();
 
             // Clean up caches on all nodes before run.
-            ignite.jcache(CACHE_NAME).clear();
+            ignite.cache(null).clear();
 
             System.out.println();
             System.out.println(">>> Cache clear finished.");
 
             long start = System.currentTimeMillis();
 
-            try (IgniteDataStreamer<Integer, String> stmr = 
ignite.dataStreamer(CACHE_NAME)) {
+            try (IgniteDataStreamer<Integer, String> stmr = 
ignite.dataStreamer(null)) {
                 // Configure loader.
                 stmr.perNodeBufferSize(1024);
                 stmr.perNodeParallelOperations(8);
@@ -92,25 +88,21 @@ public class TextSocketStreamerExample {
                 IgniteTextSocketStreamer<Integer, String> sockStmr =
                     new IgniteTextSocketStreamer<>(HOST, PORT, stmr, 
converter);
 
-                IgniteFuture<Void> fut = sockStmr.start();
+                sockStmr.start();
 
+                //TODO: wait ???
                 try {
-                    fut.get(500);
-                } catch (IgniteFutureTimeoutException e) {
-                    // No-op.
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
                 }
 
-                //fut.get();
-
                 sockStmr.stop();
-
-                System.out.println(">>> Future done: " + fut.isDone());
-                System.out.println(">>> Future canceled: " + 
fut.isCancelled());
             }
 
             long end = System.currentTimeMillis();
 
-            System.out.println(">>> Cache Size " + 
ignite.jcache(CACHE_NAME).size(CachePeekMode.PRIMARY));
+            System.out.println(">>> Cache Size " + 
ignite.cache(null).size(CachePeekMode.PRIMARY));
 
             System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + 
(end - start) + "ms.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java 
b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
index 62e3641..50719a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
+++ b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.streaming;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 
@@ -65,18 +64,14 @@ public abstract class StreamReceiver<E, K, V> {
     /**
      * Starts streamer.
      */
-    public IgniteFuture<Void> start() {
+    public void start() {
         synchronized (lock) {
             if (state != State.INITIALIZED)
                 throw new IllegalStateException("Receiver in " + state + " 
state can't be started.");
 
-            GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
-
-            new Thread(new Receiver(fut)).start();
+            new Thread(new Receiver()).start();
 
             state = State.STARTED;
-
-            return new IgniteFutureImpl<>(fut);
         }
     }
 
@@ -156,35 +151,15 @@ public abstract class StreamReceiver<E, K, V> {
      * Receiver worker that actually receives data from socket.
      */
     private class Receiver implements Runnable {
-        /** Future. */
-        private final GridFutureAdapter<Void> fut;
-
-        /**
-         * @param fut Future.
-         */
-        public Receiver(GridFutureAdapter<Void> fut) {
-            this.fut = fut;
-        }
-
         /** {@inheritDoc} */
         @Override public void run() {
-            Throwable err = null;
-
             try {
                 loadData();
             }
             catch (Throwable e) {
-                err = e;
+                //TODO: restart
             }
             finally {
-                if (state == State.STOPPED)
-                    fut.onCancelled();
-                else {
-                    state = State.STOPPED;
-
-                    fut.onDone(null, err);
-                }
-
                 stopLatch.countDown();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java 
b/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java
new file mode 100644
index 0000000..79bb27e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains Ignite streaming classes.
+ */
+package org.apache.ignite.streaming;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
index a88c214..9164352 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.testframework.junits.common.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.cache.CacheMode.*;
 
@@ -63,7 +64,7 @@ public class IgniteSocketStreamerTest extends 
GridCommonAbstractTest {
     public void testStreamer() throws Exception {
         try (Ignite g = startGrid()) {
 
-            IgniteCache<Integer, String> cache = g.jcache(null);
+            IgniteCache<Integer, String> cache = g.cache(null);
 
             cache.clear();
 
@@ -78,15 +79,29 @@ public class IgniteSocketStreamerTest extends 
GridCommonAbstractTest {
                         }
                     };
 
+                final AtomicInteger cnt = new AtomicInteger();
+
                 IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, 
String> sockStmr =
-                    new IgniteSocketStreamer<>(HOST, PORT, stmr, converter);
+                    new IgniteSocketStreamer<IgniteBiTuple<Integer, String>, 
Integer, String>(
+                            HOST, PORT, stmr, converter
+                    ) {
+                        @Override protected void 
addData(IgniteBiTuple<Integer, String> element) {
+                            super.addData(element);
+
+                            cnt.incrementAndGet();
+                        }
+                    };
+
+                sockStmr.start();
 
-                IgniteFuture<Void> fut = sockStmr.start();
+                // Wait for all data streamed.
+                while (cnt.get() < ENTRY_CNT)
+                    Thread.sleep(200);
 
-                fut.get();
+                sockStmr.stop();
 
-                assertTrue(fut.isDone());
-                assertFalse(fut.isCancelled());
+                assertFalse(sockStmr.isStarted());
+                assertTrue(sockStmr.isStopped());
             }
 
             assertEquals(ENTRY_CNT, cache.size());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
index 6f9b228..635b983 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
@@ -8,8 +8,9 @@ import org.apache.ignite.testframework.junits.common.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.concurrent.atomic.*;
 
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.*;
 
 /**
  * Test for data loading using {@link IgniteTextSocketStreamer}.
@@ -46,7 +47,7 @@ public class IgniteTextSocketStreamerTest extends 
GridCommonAbstractTest {
     public void testStream() throws Exception {
         try (Ignite g = startGrid()) {
 
-            IgniteCache<Integer, String> cache = g.jcache(null);
+            IgniteCache<Integer, String> cache = g.cache(null);
 
             cache.clear();
 
@@ -57,20 +58,33 @@ public class IgniteTextSocketStreamerTest extends 
GridCommonAbstractTest {
                 IgniteClosure<String, Map.Entry<Integer, String>> converter =
                     new IgniteClosure<String, Map.Entry<Integer, String>>() {
                         @Override public Map.Entry<Integer, String> 
apply(String input) {
-                            String[] pair = input.split("=");
+                            String[] pair = input.split("=", 2);
+
                             return new 
IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]);
                         }
                     };
 
+                final AtomicInteger cnt = new AtomicInteger();
+
                 IgniteTextSocketStreamer<Integer, String> sockStmr =
-                    new IgniteTextSocketStreamer<>(HOST, PORT, stmr, 
converter);
+                    new IgniteTextSocketStreamer<Integer, String>(HOST, PORT, 
stmr, converter) {
+                        @Override protected void addData(String element) {
+                            super.addData(element);
+
+                            cnt.incrementAndGet();
+                        }
+                    };
+
+                sockStmr.start();
 
-                IgniteFuture<Void> fut = sockStmr.start();
+                // Wait for all data streamed.
+                while (cnt.get() < ENTRY_CNT)
+                    Thread.sleep(200);
 
-                fut.get();
+                sockStmr.stop();
 
-                assertTrue(fut.isDone());
-                assertFalse(fut.isCancelled());
+                assertFalse(sockStmr.isStarted());
+                assertTrue(sockStmr.isStopped());
             }
 
             assertEquals(ENTRY_CNT, cache.size());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
index 660f47e..4860b97 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
@@ -1,8 +1,9 @@
 package org.apache.ignite.streaming;
 
-import junit.framework.TestCase;
 import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
+
+import junit.framework.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -26,7 +27,7 @@ public class StreamReceiverTest extends TestCase {
     private final StreamReceiver<Integer, Integer, String> receiver =
         new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) {
             @Override protected void loadData() {
-                while (!isStopped() && !terminatedNormally) {
+                while (!isStopped()) {
                     try {
                         Thread.sleep(50);
                     }
@@ -37,89 +38,25 @@ public class StreamReceiverTest extends TestCase {
             }
         };
 
-    /** Terminated normally flag. */
-    private volatile boolean terminatedNormally;
-
-    /**
-     * Tests receiver behavior in case of normal termination.
-     *
-     * @throws Exception If error occurred.
-     */
-    public void testTerminatedNormally() throws Exception {
-        assertEquals(StreamReceiver.State.INITIALIZED, receiver.state());
-        assertFalse(receiver.isStarted());
-        assertFalse(receiver.isStopped());
-
-        IgniteFuture<Void> fut = receiver.start();
-
-        assertEquals(StreamReceiver.State.STARTED, receiver.state());
-
-        assertTrue(receiver.isStarted());
-        assertFalse(receiver.isStopped());
-
-        assertFalse(fut.isDone());
-        assertFalse(fut.isCancelled());
-
-        try {
-            fut.get(500);
-        }
-        catch (IgniteException e) {
-            // No-op.
-        }
-
-        assertEquals(StreamReceiver.State.STARTED, receiver.state());
-        assertTrue(receiver.isStarted());
-        assertFalse(receiver.isStopped());
-
-        assertFalse(fut.isDone());
-        assertFalse(fut.isCancelled());
-
-        terminatedNormally = true;
-
-        fut.get();
-
-        assertEquals(StreamReceiver.State.STOPPED, receiver.state());
-
-        assertFalse(receiver.isStarted());
-        assertTrue(receiver.isStopped());
-
-        assertTrue(fut.isDone());
-        assertFalse(fut.isCancelled());
-    }
-
     /**
      * Tests receiver behavior in case of forced termination.
      *
      * @throws Exception If error occurred.
      */
-    public void testStopped() throws Exception {
+    public void testReceiver() throws Exception {
         assertEquals(StreamReceiver.State.INITIALIZED, receiver.state());
         assertFalse(receiver.isStarted());
         assertFalse(receiver.isStopped());
 
-        IgniteFuture<Void> fut = receiver.start();
+        receiver.start();
 
         assertEquals(StreamReceiver.State.STARTED, receiver.state());
 
         assertTrue(receiver.isStarted());
         assertFalse(receiver.isStopped());
 
-        assertFalse(fut.isDone());
-        assertFalse(fut.isCancelled());
-
-        try {
-            fut.get(500);
-        }
-        catch (IgniteException e) {
-            // No-op.
-        }
-
-        assertEquals(StreamReceiver.State.STARTED, receiver.state());
-        assertTrue(receiver.isStarted());
-        assertFalse(receiver.isStopped());
-
-        assertFalse(fut.isDone());
-        assertFalse(fut.isCancelled());
+        // Wait for some period before stop.
+        Thread.sleep(500);
 
         receiver.stop();
 
@@ -127,9 +64,6 @@ public class StreamReceiverTest extends TestCase {
 
         assertFalse(receiver.isStarted());
         assertTrue(receiver.isStopped());
-
-        assertTrue(fut.isDone());
-        assertTrue(fut.isCancelled());
     }
 
     /**
@@ -206,7 +140,7 @@ public class StreamReceiverTest extends TestCase {
         }
 
         /** {@inheritDoc} */
-        @Override public void updater(Updater<K, V> updater) {
+        @Override public void 
receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) {
             // No-op.
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e2bb9f2..bae3d46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -791,7 +791,7 @@
                                         </group>
                                         <group>
                                             <title>Streaming APIs</title>
-                                            
<packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream</packages>
+                                            
<packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.streaming</packages>
                                         </group>
                                         <group>
                                             <title>Security APIs</title>
@@ -986,7 +986,7 @@
                                         </group>
                                         <group>
                                             <title>Streaming APIs</title>
-                                            
<packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream</packages>
+                                            
<packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.streaming</packages>
                                         </group>
                                         <group>
                                             <title>Security APIs</title>

Reply via email to