Repository: incubator-ignite Updated Branches: refs/heads/ignite-430 cf9ddc5c5 -> 5f2830daa
ignite-430 IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5f2830da Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5f2830da Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5f2830da Branch: refs/heads/ignite-430 Commit: 5f2830daa3558a8c365990de88b61a6bf5b386e8 Parents: cf9ddc5 Author: Andrey Gura <ag...@okko.tv> Authored: Fri Mar 27 11:23:54 2015 +0300 Committer: Andrey Gura <ag...@okko.tv> Committed: Fri Mar 27 11:23:54 2015 +0300 ---------------------------------------------------------------------- examples/config/example-cache.xml | 4 +- .../ignite/streaming/IgniteSocketStreamer.java | 2 +- .../apache/ignite/streaming/StreamReceiver.java | 14 ++- .../streaming/IgniteTextSocketStreamerTest.java | 5 +- .../ignite/streaming/StreamReceiverTest.java | 104 +++++++++++++++---- 5 files changed, 95 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/examples/config/example-cache.xml ---------------------------------------------------------------------- diff --git a/examples/config/example-cache.xml b/examples/config/example-cache.xml index 0bce70e..706f959 100644 --- a/examples/config/example-cache.xml +++ b/examples/config/example-cache.xml @@ -152,8 +152,8 @@ to our documentation: http://doc.gridgain.org/latest/Automatic+Node+Discovery --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> - <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java index e599cdd..dc53e0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java @@ -33,7 +33,7 @@ import java.util.*; * @param <K> Cache entry key type. * @param <V> Cache entry value type. */ -public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E,K,V> { +public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E, K, V> { /** Host. */ private final String host; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java index 07d6775..62e3641 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java @@ -28,9 +28,9 @@ import java.util.concurrent.*; /** * Base implementation of stream receiver. * - * @param <E> - * @param <K> - * @param <V> + * @param <E> Type of stream element. + * @param <K> Type of cache entry key. + * @param <V> Type of cache entry value/ */ public abstract class StreamReceiver<E, K, V> { /** Object monitor. */ @@ -92,7 +92,8 @@ public abstract class StreamReceiver<E, K, V> { try { stopLatch.await(); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { // No-op. } } @@ -178,8 +179,11 @@ public abstract class StreamReceiver<E, K, V> { finally { if (state == State.STOPPED) fut.onCancelled(); - else + else { + state = State.STOPPED; + fut.onDone(null, err); + } stopLatch.countDown(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java index 7ba2b13..6f9b228 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java @@ -69,14 +69,13 @@ public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { fut.get(); - System.out.println(">>> STATE " + sockStmr.state()); - assertTrue(fut.isDone()); assertFalse(fut.isCancelled()); } assertEquals(ENTRY_CNT, cache.size()); - } finally { + } + finally { stopAllGrids(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f2830da/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java index fd414c6..660f47e 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java @@ -8,35 +8,44 @@ import org.jetbrains.annotations.*; import java.util.*; /** - * + * Tests for {@link StreamReceiver}. */ public class StreamReceiverTest extends TestCase { - + /** Converter. */ private static final IgniteClosure<Integer, Map.Entry<Integer, String>> CONVERTER = new IgniteClosure<Integer, Map.Entry<Integer, String>>() { - @Override public Map.Entry<Integer, String> apply(Integer input) { - return new IgniteBiTuple<>(input, input.toString()); - } + @Override public Map.Entry<Integer, String> apply(Integer input) { + return new IgniteBiTuple<>(input, input.toString()); + } }; + /** Stmr. */ private static final IgniteDataStreamer<Integer, String> STMR = new DataStreamerStub<>(); - private volatile boolean finished = false; - - public void testName() throws Exception { - StreamReceiver<Integer, Integer, String> receiver = - new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) { - @Override protected void loadData() { - while (!isStopped() && !finished) { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - e.printStackTrace(); - } + /** Receiver. */ + private final StreamReceiver<Integer, Integer, String> receiver = + new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) { + @Override protected void loadData() { + while (!isStopped() && !terminatedNormally) { + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + // No-op. } } - }; - + } + }; + + /** Terminated normally flag. */ + private volatile boolean terminatedNormally; + + /** + * Tests receiver behavior in case of normal termination. + * + * @throws Exception If error occurred. + */ + public void testTerminatedNormally() throws Exception { assertEquals(StreamReceiver.State.INITIALIZED, receiver.state()); assertFalse(receiver.isStarted()); assertFalse(receiver.isStopped()); @@ -65,8 +74,7 @@ public class StreamReceiverTest extends TestCase { assertFalse(fut.isDone()); assertFalse(fut.isCancelled()); - //finished = true; - receiver.stop(); + terminatedNormally = true; fut.get(); @@ -77,9 +85,59 @@ public class StreamReceiverTest extends TestCase { assertTrue(fut.isDone()); assertFalse(fut.isCancelled()); + } + + /** + * Tests receiver behavior in case of forced termination. + * + * @throws Exception If error occurred. + */ + public void testStopped() throws Exception { + assertEquals(StreamReceiver.State.INITIALIZED, receiver.state()); + assertFalse(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + IgniteFuture<Void> fut = receiver.start(); + + assertEquals(StreamReceiver.State.STARTED, receiver.state()); + + assertTrue(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + assertFalse(fut.isDone()); + assertFalse(fut.isCancelled()); + + try { + fut.get(500); + } + catch (IgniteException e) { + // No-op. + } + + assertEquals(StreamReceiver.State.STARTED, receiver.state()); + assertTrue(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + assertFalse(fut.isDone()); + assertFalse(fut.isCancelled()); + + receiver.stop(); + assertEquals(StreamReceiver.State.STOPPED, receiver.state()); + + assertFalse(receiver.isStarted()); + assertTrue(receiver.isStopped()); + + assertTrue(fut.isDone()); + assertTrue(fut.isCancelled()); } + /** + * Receiver stub. + * + * @param <K> Key type. + * @param <V> Value type. + */ private static class DataStreamerStub<K, V> implements IgniteDataStreamer<K, V> { /** {@inheritDoc} */ @@ -94,7 +152,7 @@ public class StreamReceiverTest extends TestCase { /** {@inheritDoc} */ @Override public void allowOverwrite(boolean allowOverwrite) throws IgniteException { - + // No-op. } /** {@inheritDoc} */ @@ -169,7 +227,7 @@ public class StreamReceiverTest extends TestCase { /** {@inheritDoc} */ @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) - throws IllegalStateException { + throws IllegalStateException { return null; }