// Waiting for all data streamed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ea1d6213 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ea1d6213 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ea1d6213 Branch: refs/heads/ignite-430 Commit: ea1d6213217a48ac674d1b175b5a849016568dbd Parents: e0b86cd Author: Andrey Gura <ag...@okko.tv> Authored: Sun Mar 29 18:19:18 2015 +0300 Committer: Andrey Gura <ag...@okko.tv> Committed: Sun Mar 29 18:19:18 2015 +0300 ---------------------------------------------------------------------- .../streaming/SocketStreamerExample.java | 9 +-- .../streaming/TextSocketStreamerExample.java | 26 +++---- .../apache/ignite/streaming/StreamReceiver.java | 31 +------- .../apache/ignite/streaming/package-info.java | 21 +++++ .../streaming/IgniteSocketStreamerTest.java | 27 +++++-- .../streaming/IgniteTextSocketStreamerTest.java | 30 +++++-- .../ignite/streaming/StreamReceiverTest.java | 82 ++------------------ pom.xml | 4 +- 8 files changed, 89 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java index 8ed4451..cf24455 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java @@ -19,7 +19,6 @@ package org.apache.ignite.examples.streaming; import org.apache.ignite.*; import org.apache.ignite.examples.*; -import org.apache.ignite.examples.datagrid.*; import org.apache.ignite.lang.*; import org.apache.ignite.streaming.*; @@ -33,7 +32,7 @@ import java.util.*; * Remote nodes should always be started with special configuration file which * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. * <p> - * Alternatively you can run {@link CacheNodeStartup} in another JVM which will + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will * start node with {@code examples/config/example-cache.xml} configuration. */ public class SocketStreamerExample { @@ -68,7 +67,7 @@ public class SocketStreamerExample { startServer(); // Clean up caches on all nodes before run. - ignite.jcache(CACHE_NAME).clear(); + ignite.cache(CACHE_NAME).clear(); System.out.println(); System.out.println(">>> Cache clear finished."); @@ -90,9 +89,7 @@ public class SocketStreamerExample { IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = new IgniteSocketStreamer<>(HOST, PORT, stmr, converter); - IgniteFuture<Void> fut = sockStmr.start(); - - fut.get(); + sockStmr.start(); } long end = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java index 49de6cb..6731a3c 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java @@ -20,7 +20,6 @@ package org.apache.ignite.examples.streaming; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.examples.*; -import org.apache.ignite.examples.datagrid.*; import org.apache.ignite.lang.*; import org.apache.ignite.streaming.*; @@ -34,13 +33,10 @@ import java.util.*; * Remote nodes should always be started with special configuration file which * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. * <p> - * Alternatively you can run {@link CacheNodeStartup} in another JVM which will + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will * start node with {@code examples/config/example-cache.xml} configuration. */ public class TextSocketStreamerExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned"; - /** Number of entries to load. */ private static final int ENTRY_COUNT = 500000; @@ -69,14 +65,14 @@ public class TextSocketStreamerExample { startServer(); // Clean up caches on all nodes before run. - ignite.jcache(CACHE_NAME).clear(); + ignite.cache(null).clear(); System.out.println(); System.out.println(">>> Cache clear finished."); long start = System.currentTimeMillis(); - try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(CACHE_NAME)) { + try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) { // Configure loader. stmr.perNodeBufferSize(1024); stmr.perNodeParallelOperations(8); @@ -92,25 +88,21 @@ public class TextSocketStreamerExample { IgniteTextSocketStreamer<Integer, String> sockStmr = new IgniteTextSocketStreamer<>(HOST, PORT, stmr, converter); - IgniteFuture<Void> fut = sockStmr.start(); + sockStmr.start(); + //TODO: wait ??? try { - fut.get(500); - } catch (IgniteFutureTimeoutException e) { - // No-op. + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); } - //fut.get(); - sockStmr.stop(); - - System.out.println(">>> Future done: " + fut.isDone()); - System.out.println(">>> Future canceled: " + fut.isCancelled()); } long end = System.currentTimeMillis(); - System.out.println(">>> Cache Size " + ignite.jcache(CACHE_NAME).size(CachePeekMode.PRIMARY)); + System.out.println(">>> Cache Size " + ignite.cache(null).size(CachePeekMode.PRIMARY)); System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java index 62e3641..50719a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java @@ -18,7 +18,6 @@ package org.apache.ignite.streaming; import org.apache.ignite.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -65,18 +64,14 @@ public abstract class StreamReceiver<E, K, V> { /** * Starts streamer. */ - public IgniteFuture<Void> start() { + public void start() { synchronized (lock) { if (state != State.INITIALIZED) throw new IllegalStateException("Receiver in " + state + " state can't be started."); - GridFutureAdapter<Void> fut = new GridFutureAdapter<>(); - - new Thread(new Receiver(fut)).start(); + new Thread(new Receiver()).start(); state = State.STARTED; - - return new IgniteFutureImpl<>(fut); } } @@ -156,35 +151,15 @@ public abstract class StreamReceiver<E, K, V> { * Receiver worker that actually receives data from socket. */ private class Receiver implements Runnable { - /** Future. */ - private final GridFutureAdapter<Void> fut; - - /** - * @param fut Future. - */ - public Receiver(GridFutureAdapter<Void> fut) { - this.fut = fut; - } - /** {@inheritDoc} */ @Override public void run() { - Throwable err = null; - try { loadData(); } catch (Throwable e) { - err = e; + //TODO: restart } finally { - if (state == State.STOPPED) - fut.onCancelled(); - else { - state = State.STOPPED; - - fut.onDone(null, err); - } - stopLatch.countDown(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java b/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java new file mode 100644 index 0000000..79bb27e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains Ignite streaming classes. + */ +package org.apache.ignite.streaming; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java index a88c214..9164352 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.testframework.junits.common.*; import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheMode.*; @@ -63,7 +64,7 @@ public class IgniteSocketStreamerTest extends GridCommonAbstractTest { public void testStreamer() throws Exception { try (Ignite g = startGrid()) { - IgniteCache<Integer, String> cache = g.jcache(null); + IgniteCache<Integer, String> cache = g.cache(null); cache.clear(); @@ -78,15 +79,29 @@ public class IgniteSocketStreamerTest extends GridCommonAbstractTest { } }; + final AtomicInteger cnt = new AtomicInteger(); + IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = - new IgniteSocketStreamer<>(HOST, PORT, stmr, converter); + new IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String>( + HOST, PORT, stmr, converter + ) { + @Override protected void addData(IgniteBiTuple<Integer, String> element) { + super.addData(element); + + cnt.incrementAndGet(); + } + }; + + sockStmr.start(); - IgniteFuture<Void> fut = sockStmr.start(); + // Wait for all data streamed. + while (cnt.get() < ENTRY_CNT) + Thread.sleep(200); - fut.get(); + sockStmr.stop(); - assertTrue(fut.isDone()); - assertFalse(fut.isCancelled()); + assertFalse(sockStmr.isStarted()); + assertTrue(sockStmr.isStopped()); } assertEquals(ENTRY_CNT, cache.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java index 6f9b228..635b983 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java @@ -8,8 +8,9 @@ import org.apache.ignite.testframework.junits.common.*; import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.atomic.*; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.*; /** * Test for data loading using {@link IgniteTextSocketStreamer}. @@ -46,7 +47,7 @@ public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { public void testStream() throws Exception { try (Ignite g = startGrid()) { - IgniteCache<Integer, String> cache = g.jcache(null); + IgniteCache<Integer, String> cache = g.cache(null); cache.clear(); @@ -57,20 +58,33 @@ public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { IgniteClosure<String, Map.Entry<Integer, String>> converter = new IgniteClosure<String, Map.Entry<Integer, String>>() { @Override public Map.Entry<Integer, String> apply(String input) { - String[] pair = input.split("="); + String[] pair = input.split("=", 2); + return new IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]); } }; + final AtomicInteger cnt = new AtomicInteger(); + IgniteTextSocketStreamer<Integer, String> sockStmr = - new IgniteTextSocketStreamer<>(HOST, PORT, stmr, converter); + new IgniteTextSocketStreamer<Integer, String>(HOST, PORT, stmr, converter) { + @Override protected void addData(String element) { + super.addData(element); + + cnt.incrementAndGet(); + } + }; + + sockStmr.start(); - IgniteFuture<Void> fut = sockStmr.start(); + // Wait for all data streamed. + while (cnt.get() < ENTRY_CNT) + Thread.sleep(200); - fut.get(); + sockStmr.stop(); - assertTrue(fut.isDone()); - assertFalse(fut.isCancelled()); + assertFalse(sockStmr.isStarted()); + assertTrue(sockStmr.isStopped()); } assertEquals(ENTRY_CNT, cache.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java index 660f47e..4860b97 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java @@ -1,8 +1,9 @@ package org.apache.ignite.streaming; -import junit.framework.TestCase; import org.apache.ignite.*; import org.apache.ignite.lang.*; + +import junit.framework.*; import org.jetbrains.annotations.*; import java.util.*; @@ -26,7 +27,7 @@ public class StreamReceiverTest extends TestCase { private final StreamReceiver<Integer, Integer, String> receiver = new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) { @Override protected void loadData() { - while (!isStopped() && !terminatedNormally) { + while (!isStopped()) { try { Thread.sleep(50); } @@ -37,89 +38,25 @@ public class StreamReceiverTest extends TestCase { } }; - /** Terminated normally flag. */ - private volatile boolean terminatedNormally; - - /** - * Tests receiver behavior in case of normal termination. - * - * @throws Exception If error occurred. - */ - public void testTerminatedNormally() throws Exception { - assertEquals(StreamReceiver.State.INITIALIZED, receiver.state()); - assertFalse(receiver.isStarted()); - assertFalse(receiver.isStopped()); - - IgniteFuture<Void> fut = receiver.start(); - - assertEquals(StreamReceiver.State.STARTED, receiver.state()); - - assertTrue(receiver.isStarted()); - assertFalse(receiver.isStopped()); - - assertFalse(fut.isDone()); - assertFalse(fut.isCancelled()); - - try { - fut.get(500); - } - catch (IgniteException e) { - // No-op. - } - - assertEquals(StreamReceiver.State.STARTED, receiver.state()); - assertTrue(receiver.isStarted()); - assertFalse(receiver.isStopped()); - - assertFalse(fut.isDone()); - assertFalse(fut.isCancelled()); - - terminatedNormally = true; - - fut.get(); - - assertEquals(StreamReceiver.State.STOPPED, receiver.state()); - - assertFalse(receiver.isStarted()); - assertTrue(receiver.isStopped()); - - assertTrue(fut.isDone()); - assertFalse(fut.isCancelled()); - } - /** * Tests receiver behavior in case of forced termination. * * @throws Exception If error occurred. */ - public void testStopped() throws Exception { + public void testReceiver() throws Exception { assertEquals(StreamReceiver.State.INITIALIZED, receiver.state()); assertFalse(receiver.isStarted()); assertFalse(receiver.isStopped()); - IgniteFuture<Void> fut = receiver.start(); + receiver.start(); assertEquals(StreamReceiver.State.STARTED, receiver.state()); assertTrue(receiver.isStarted()); assertFalse(receiver.isStopped()); - assertFalse(fut.isDone()); - assertFalse(fut.isCancelled()); - - try { - fut.get(500); - } - catch (IgniteException e) { - // No-op. - } - - assertEquals(StreamReceiver.State.STARTED, receiver.state()); - assertTrue(receiver.isStarted()); - assertFalse(receiver.isStopped()); - - assertFalse(fut.isDone()); - assertFalse(fut.isCancelled()); + // Wait for some period before stop. + Thread.sleep(500); receiver.stop(); @@ -127,9 +64,6 @@ public class StreamReceiverTest extends TestCase { assertFalse(receiver.isStarted()); assertTrue(receiver.isStopped()); - - assertTrue(fut.isDone()); - assertTrue(fut.isCancelled()); } /** @@ -206,7 +140,7 @@ public class StreamReceiverTest extends TestCase { } /** {@inheritDoc} */ - @Override public void updater(Updater<K, V> updater) { + @Override public void receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea1d6213/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e2bb9f2..bae3d46 100644 --- a/pom.xml +++ b/pom.xml @@ -791,7 +791,7 @@ </group> <group> <title>Streaming APIs</title> - <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream</packages> + <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.streaming</packages> </group> <group> <title>Security APIs</title> @@ -986,7 +986,7 @@ </group> <group> <title>Streaming APIs</title> - <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream</packages> + <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.streaming</packages> </group> <group> <title>Security APIs</title>