Repository: incubator-ignite Updated Branches: refs/heads/ignite-471-2 4007a4303 -> d64185b82
ignite-471-2: disable a test for portable marshaller Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d64185b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d64185b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d64185b8 Branch: refs/heads/ignite-471-2 Commit: d64185b82640d994b263a9558909a7b86546743c Parents: 4007a43 Author: Denis Magda <dma...@gridgain.com> Authored: Thu Jun 11 09:27:46 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Jun 11 09:27:46 2015 +0300 ---------------------------------------------------------------------- .../ignite/messaging/GridMessagingSelfTest.java | 557 ++++++++++--------- .../ignite/testsuites/IgniteBasicTestSuite.java | 2 +- 2 files changed, 295 insertions(+), 264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d64185b8/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index fb4ac8c..455bc8c 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -75,6 +75,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser /** Shared IP finder. */ private final transient TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + /** */ + protected static CountDownLatch rcvLatch; + /** * A test message topic. */ @@ -180,7 +183,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder); @@ -196,9 +199,34 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser * @throws Exception If error occurs. */ public void testSendReceiveMessage() throws Exception { - ReceiveRemoteMessageListener<UUID, Object> list = new ReceiveRemoteMessageListener<>(ignite2, 3); + final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); + + final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable + + final CountDownLatch rcvLatch = new CountDownLatch(3); + + ignite1.message().localListen(null, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + try { + log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); - ignite1.message().localListen(null, list); + if (!nodeId.equals(ignite2.cluster().localNode().id())) { + log.error("Unexpected sender node: " + nodeId); + + error.set(true); + + return false; + } + + rcvMsgs.add(msg); + + return true; + } + finally { + rcvLatch.countDown(); + } + } + }); ClusterGroup rNode1 = ignite2.cluster().forRemotes(); @@ -206,13 +234,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser message(rNode1).send(null, MSG_2); message(rNode1).send(null, MSG_3); - assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS)); + assertTrue(rcvLatch.await(3, TimeUnit.SECONDS)); - assertFalse(list.error.get()); + assertFalse(error.get()); - assertTrue(list.rcvMsgs.contains(MSG_1)); - assertTrue(list.rcvMsgs.contains(MSG_2)); - assertTrue(list.rcvMsgs.contains(MSG_3)); + assertTrue(rcvMsgs.contains(MSG_1)); + assertTrue(rcvMsgs.contains(MSG_2)); + assertTrue(rcvMsgs.contains(MSG_3)); } /** @@ -579,9 +607,24 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser * @throws Exception If error occurs. */ public void testRemoteListen() throws Exception { - MessageReceiverListener list = new MessageReceiverListener(); + final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); - ignite2.message().remoteListen(null, list); + rcvLatch = new CountDownLatch(4); + + ignite2.message().remoteListen(null, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + try { + log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + rcvMsgs.add(msg); + + return true; + } + finally { + rcvLatch.countDown(); + } + } + }); ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2. @@ -589,11 +632,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser message(prj2).send(null, MSG_2); message(ignite2.cluster().forLocal()).send(null, MSG_3); - assertFalse(list.rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message. + assertFalse(rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message. - assertTrue(list.rcvMsgs.contains(MSG_1)); - assertTrue(list.rcvMsgs.contains(MSG_2)); - assertTrue(list.rcvMsgs.contains(MSG_3)); + assertTrue(rcvMsgs.contains(MSG_1)); + assertTrue(rcvMsgs.contains(MSG_2)); + assertTrue(rcvMsgs.contains(MSG_3)); } /** @@ -601,19 +644,45 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser */ @SuppressWarnings("TooBroadScope") public void testStopRemoteListen() throws Exception { - final IncrementTestListener list1 = new IncrementTestListener(); - final IncrementTestListener list2 = new IncrementTestListener(); - final IncrementTestListener list3 = new IncrementTestListener(); + final AtomicInteger msgCnt1 = new AtomicInteger(); + + final AtomicInteger msgCnt2 = new AtomicInteger(); + + final AtomicInteger msgCnt3 = new AtomicInteger(); final String topic1 = null; final String topic2 = "top2"; final String topic3 = "top3"; - UUID id1 = ignite2.message().remoteListen(topic1, list1); + UUID id1 = ignite2.message().remoteListen(topic1, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println(Thread.currentThread().getName() + " Listener1 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); - UUID id2 = ignite2.message().remoteListen(topic2, list2); + msgCnt1.incrementAndGet(); - UUID id3 = ignite2.message().remoteListen(topic3, list3); + return true; + } + }); + + UUID id2 = ignite2.message().remoteListen(topic2, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println(Thread.currentThread().getName() + " Listener2 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + msgCnt2.incrementAndGet(); + + return true; + } + }); + + UUID id3 = ignite2.message().remoteListen(topic3, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println(Thread.currentThread().getName() + " Listener3 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + msgCnt3.incrementAndGet(); + + return true; + } + }); message(ignite1.cluster().forRemotes()).send(topic1, "msg1-1"); message(ignite1.cluster().forRemotes()).send(topic2, "msg1-2"); @@ -621,13 +690,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return list1.msgCnt.get() > 0 && list2.msgCnt.get() > 0 && list3.msgCnt.get() > 0; + return msgCnt1.get() > 0 && msgCnt2.get() > 0 && msgCnt3.get() > 0; } }, 5000); - assertEquals(1, list1.msgCnt.get()); - assertEquals(1, list2.msgCnt.get()); - assertEquals(1, list3.msgCnt.get()); + assertEquals(1, msgCnt1.get()); + assertEquals(1, msgCnt2.get()); + assertEquals(1, msgCnt3.get()); ignite2.message().stopRemoteListen(id2); @@ -637,13 +706,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return list1.msgCnt.get() > 1 && list3.msgCnt.get() > 1; + return msgCnt1.get() > 1 && msgCnt3.get() > 1; } }, 5000); - assertEquals(2, list1.msgCnt.get()); - assertEquals(1, list2.msgCnt.get()); - assertEquals(2, list3.msgCnt.get()); + assertEquals(2, msgCnt1.get()); + assertEquals(1, msgCnt2.get()); + assertEquals(2, msgCnt3.get()); ignite2.message().stopRemoteListen(id2); // Try remove one more time. @@ -656,9 +725,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser U.sleep(1000); - assertEquals(2, list1.msgCnt.get()); - assertEquals(1, list2.msgCnt.get()); - assertEquals(2, list3.msgCnt.get()); + assertEquals(2, msgCnt1.get()); + assertEquals(1, msgCnt2.get()); + assertEquals(2, msgCnt3.get()); } /** @@ -673,21 +742,46 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser new TestMessage(MSG_2, 3000), new TestMessage(MSG_3)); - ReceiveRemoteMessageListener receiver = new ReceiveRemoteMessageListener<>(ignite1, 3); + final Collection<Object> rcvMsgs = new ConcurrentLinkedDeque<>(); + + final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable + + rcvLatch = new CountDownLatch(3); + + ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + try { + log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + if (!nodeId.equals(ignite1.cluster().localNode().id())) { + log.error("Unexpected sender node: " + nodeId); + + error.set(true); + + return false; + } - ignite2.message().remoteListen(S_TOPIC_1, receiver); + rcvMsgs.add(msg); + + return true; + } + finally { + rcvLatch.countDown(); + } + } + }); ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2. for (TestMessage msg : msgs) message(prj2).sendOrdered(S_TOPIC_1, msg, 15000); - assertTrue(receiver.rcvLatch.await(6, TimeUnit.SECONDS)); + assertTrue(rcvLatch.await(6, TimeUnit.SECONDS)); - assertFalse(receiver.error.get()); + assertFalse(error.get()); //noinspection AssertEqualsBetweenInconvertibleTypes - assertEquals(msgs, Arrays.asList(receiver.rcvMsgs.toArray())); + assertEquals(msgs, Arrays.asList(rcvMsgs.toArray())); } /** @@ -697,17 +791,122 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser * @throws Exception If error occurs. */ public void testRemoteListenWithIntTopic() throws Exception { - ListenWithIntTopic topList1 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_1, MSG_1); + final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); + + final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable + + rcvLatch = new CountDownLatch(3); + + ignite2.message().remoteListen(I_TOPIC_1, new P2<UUID, Object>() { + @IgniteInstanceResource + private transient Ignite g; + + @Override public boolean apply(UUID nodeId, Object msg) { + assertEquals(ignite2, g); + + try { + log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + + ", topic=" + I_TOPIC_1 + ']'); + + if (!nodeId.equals(ignite1.cluster().localNode().id())) { + log.error("Unexpected sender node: " + nodeId); + + error.set(true); + + return false; + } + + if (!MSG_1.equals(msg)) { + log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_1); + + error.set(true); + + return false; + } + + rcvMsgs.add(msg); + + return true; + } + finally { + rcvLatch.countDown(); + } + } + }); + + ignite2.message().remoteListen(I_TOPIC_2, new P2<UUID, Object>() { + @IgniteInstanceResource + private transient Ignite g; + + @Override public boolean apply(UUID nodeId, Object msg) { + assertEquals(ignite2, g); + + try { + log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + + ", topic=" + I_TOPIC_2 + ']'); + + if (!nodeId.equals(ignite1.cluster().localNode().id())) { + log.error("Unexpected sender node: " + nodeId); + + error.set(true); + + return false; + } + + if (!MSG_2.equals(msg)) { + log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_2); + + error.set(true); + + return false; + } + + rcvMsgs.add(msg); + + return true; + } + finally { + rcvLatch.countDown(); + } + } + }); + + ignite2.message().remoteListen(null, new P2<UUID, Object>() { + @IgniteInstanceResource + private transient Ignite g; + + @Override public boolean apply(UUID nodeId, Object msg) { + assertEquals(ignite2, g); + + try { + log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + + ", topic=default]"); + + if (!nodeId.equals(ignite1.cluster().localNode().id())) { + log.error("Unexpected sender node: " + nodeId); - ListenWithIntTopic topList2 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_2, MSG_2); + error.set(true); - ListenWithIntTopic topList3 = new ListenWithIntTopic(ignite1, ignite2, null, MSG_3); + return false; + } - ignite2.message().remoteListen(I_TOPIC_1, topList1); + if (!MSG_3.equals(msg)) { + log.error("Unexpected message " + msg + " for topic: default"); - ignite2.message().remoteListen(I_TOPIC_2, topList2); + error.set(true); - ignite2.message().remoteListen(null, topList3); + return false; + } + + rcvMsgs.add(msg); + + return true; + } + finally { + rcvLatch.countDown(); + } + } + }); ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2. @@ -715,17 +914,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser message(prj2).send(I_TOPIC_2, MSG_2); message(prj2).send(null, MSG_3); - assertTrue(topList1.rcvLatch.await(3, TimeUnit.SECONDS)); - assertTrue(topList2.rcvLatch.await(3, TimeUnit.SECONDS)); - assertTrue(topList3.rcvLatch.await(3, TimeUnit.SECONDS)); + assertTrue(rcvLatch.await(3, TimeUnit.SECONDS)); - assertFalse(topList1.error.get()); - assertFalse(topList2.error.get()); - assertFalse(topList3.error.get()); + assertFalse(error.get()); - assertTrue(topList1.rcvMsgs.contains(MSG_1)); - assertTrue(topList2.rcvMsgs.contains(MSG_2)); - assertTrue(topList3.rcvMsgs.contains(MSG_3)); + assertTrue(rcvMsgs.contains(MSG_1)); + assertTrue(rcvMsgs.contains(MSG_2)); + assertTrue(rcvMsgs.contains(MSG_3)); } /** @@ -741,15 +936,36 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Class rcCls = extLdr.loadClass(EXT_RESOURCE_CLS_NAME); - ReceiveRemoteMessageListener list = new ReceiveRemoteMessageListener(ignite1, 1); + final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable + + final CountDownLatch rcvLatch = new CountDownLatch(1); - ignite2.message().remoteListen(S_TOPIC_1, list); + ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + try { + log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + if (!nodeId.equals(ignite1.cluster().localNode().id())) { + log.error("Unexpected sender node: " + nodeId); + + error.set(true); + + return false; + } + + return true; + } + finally { + rcvLatch.countDown(); + } + } + }); message(ignite1.cluster().forRemotes()).send(S_TOPIC_1, Collections.singleton(rcCls.newInstance())); - assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS)); + assertTrue(rcvLatch.await(3, TimeUnit.SECONDS)); - assertFalse(list.error.get()); + assertFalse(error.get()); } /** @@ -796,6 +1012,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser * @throws Exception If failed. */ public void testAsync() throws Exception { + final AtomicInteger msgCnt = new AtomicInteger(); + assertFalse(ignite2.message().isAsync()); final IgniteMessaging msg = ignite2.message().withAsync(); @@ -804,8 +1022,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser assertFalse(ignite2.message().isAsync()); - final IncrementTestListener list = new IncrementTestListener(); - GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { msg.future(); @@ -816,7 +1032,16 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser final String topic = "topic"; - UUID id = msg.remoteListen(topic, list); + UUID id = msg.remoteListen(topic, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println(Thread.currentThread().getName() + + " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + msgCnt.incrementAndGet(); + + return true; + } + }); Assert.assertNull(id); @@ -840,11 +1065,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return list.msgCnt.get() > 0; + return msgCnt.get() > 0; } }, 5000); - assertEquals(1, list.msgCnt.get()); + assertEquals(1, msgCnt.get()); msg.stopRemoteListen(id); @@ -866,7 +1091,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser U.sleep(1000); - assertEquals(1, list.msgCnt.get()); + assertEquals(1, msgCnt.get()); } /** @@ -890,7 +1115,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser /** * @param expOldestIgnite Expected oldest ignite. - * @throws InterruptedException If interrupted. */ private void remoteListenForOldest(Ignite expOldestIgnite) throws InterruptedException { ClusterGroup grp = ignite1.cluster().forOldest(); @@ -898,213 +1122,20 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser assertEquals(1, grp.nodes().size()); assertEquals(expOldestIgnite.cluster().localNode().id(), grp.node().id()); - ignite1.message(grp).remoteListen(null, new ListenForOldestListener<UUID, Object>()); - - ignite1.message().send(null, MSG_1); - - Thread.sleep(3000); - - assertEquals(1, MSG_CNT.get()); - } - - /** - * - */ - private static class IncrementTestListener<UUID, Object> implements P2<UUID, Object> { - /** */ - final AtomicInteger msgCnt = new AtomicInteger(); - - /** */ - @LoggerResource - private transient IgniteLogger log; - - /** {@inheritDoc} */ - @Override public boolean apply(UUID nodeId, Object msg) { - log.info("Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); - - msgCnt.incrementAndGet(); - - return true; - } - } - - /** - * - */ - private static class ReceiveRemoteMessageListener<UUID, Object> implements P2<UUID, Object> { - /** */ - final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>(); - - /** */ - final AtomicBoolean error = new AtomicBoolean(false); - - /** */ - final CountDownLatch rcvLatch; - - /** */ - final Ignite sender; - - /** */ - @LoggerResource - private transient IgniteLogger log; - - /** - * @param sender - * @param latchCount - */ - public ReceiveRemoteMessageListener(Ignite sender, int latchCount) { - this.sender = sender; - rcvLatch = new CountDownLatch(latchCount); - } - - /** {@inheritDoc} */ - @Override public boolean apply(UUID nodeId, Object msg) { - try { - log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); - - if (!nodeId.equals(sender.cluster().localNode().id())) { - log.info("Unexpected sender node: " + nodeId); - - error.set(true); - - return false; - } - - rcvMsgs.add(msg); - - return true; - } - finally { - rcvLatch.countDown(); - } - } - } - - - /** - * - */ - private static class ListenForOldestListener<UUID, Object> implements P2<UUID, Object> { - /** */ - @LoggerResource - private transient IgniteLogger log; - - /** {@inheritDoc} */ - @Override public boolean apply(UUID nodeId, Object msg) { - log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); - - MSG_CNT.incrementAndGet(); - - return true; - } - } - - /** - * - */ - private static class ListenWithIntTopic implements P2<UUID, Object> { - /** */ - final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>(); - - /** */ - final AtomicBoolean error = new AtomicBoolean(false); - - /** */ - final CountDownLatch rcvLatch = new CountDownLatch(1); - - /** */ - private final Ignite sender; - - /** */ - private final Ignite receiver; - - /** */ - @IgniteInstanceResource - private transient Ignite g; - - /** */ - @LoggerResource - private transient IgniteLogger log; - - /** */ - final Integer topic; - - /** */ - final String message; - - /** - * @param sender - * @param receiver - * @param topic - * @param message - */ - public ListenWithIntTopic(Ignite sender, Ignite receiver, Integer topic, String message) { - this.sender = sender; - this.receiver = receiver; - this.topic = topic; - this.message = message; - } - - /** {@inheritDoc} */ - @Override public boolean apply(UUID nodeId, Object msg) { - assertEquals(receiver, g); - - try { - log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + - ", topic=" + topic + ']'); - - if (!nodeId.equals(sender.cluster().localNode().id())) { - log.error("Unexpected sender node: " + nodeId); - - error.set(true); - - return false; - } - - if (!message.equals(msg)) { - log.error("Unexpected message " + msg + " for topic: " + topic); - - error.set(true); - - return false; - } + ignite1.message(grp).remoteListen(null, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); - rcvMsgs.add(msg); + MSG_CNT.incrementAndGet(); return true; } - finally { - rcvLatch.countDown(); - } - } - } - - /** - * - */ - private static class MessageReceiverListener<UUID, Object> implements P2<UUID, Object> { - /** */ - final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>(); - - /** */ - final CountDownLatch rcvLatch = new CountDownLatch(4); - - /** */ - @LoggerResource - private transient IgniteLogger log; + }); - /** {@inheritDoc} */ - @Override public boolean apply(UUID nodeId, Object msg) { - try { - log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + ignite1.message().send(null, MSG_1); - rcvMsgs.add(msg); + Thread.sleep(3000); - return true; - } - finally { - rcvLatch.countDown(); - } - } + assertEquals(1, MSG_CNT.get()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d64185b8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 8c061be..949b76d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -66,7 +66,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTest(new TestSuite(GridSelfTest.class)); GridTestUtils.addTestIfNeeded(suite, GridProjectionSelfTest.class, ignoredTests); - suite.addTest(new TestSuite(GridMessagingSelfTest.class)); + GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests); suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class)); GridTestUtils.addTestIfNeeded(suite, GridMessagingNoPeerClassLoadingSelfTest.class, ignoredTests);