ignite-998
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c47438e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c47438e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c47438e8 Branch: refs/heads/ignite-gg-10411 Commit: c47438e8d2ea734218bcd9c14945ce43456269ce Parents: af120a7 Author: sboikov <sboi...@gridgain.com> Authored: Wed Jun 10 14:24:24 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jun 10 15:23:37 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 38 +------ .../tcp/TcpClientDiscoverySpiSelfTest.java | 102 ++++++++++++++++++- 2 files changed, 103 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c47438e8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 5aceaae..44374db 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1752,6 +1752,9 @@ class ServerImpl extends TcpDiscoveryImpl { @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) { assert lastMsgId != null; + if (msgs.isEmpty()) + return Collections.emptyList(); + Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size()); boolean skip = true; @@ -1769,30 +1772,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Resets pending messages. - * - * @param msgs Message. - * @param discardId Discarded message ID. - */ - void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) { - this.msgs.clear(); - - if (msgs != null) - this.msgs.addAll(msgs); - - this.discardId = discardId; - } - - /** - * Clears pending messages. - */ - void clear() { - msgs.clear(); - - discardId = null; - } - - /** * Discards message with provided ID and all before it. * * @param id Discarded message ID. @@ -2921,8 +2900,7 @@ class ServerImpl extends TcpDiscoveryImpl { topHist.clear(); topHist.putAll(msg.topologyHistory()); - // Restore pending messages. - pendingMsgs.reset(msg.messages(), msg.discardedMessageId()); + pendingMsgs.discard(msg.discardedMessageId()); // Clear data to minimize message size. msg.messages(null, null); @@ -3180,10 +3158,6 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Removed node from topology: " + leftNode); - // Clear pending messages map. - if (!ring.hasRemoteNodes()) - pendingMsgs.clear(); - long topVer; if (locNodeCoord) { @@ -3347,10 +3321,6 @@ class ServerImpl extends TcpDiscoveryImpl { assert node != null; - // Clear pending messages map. - if (!ring.hasRemoteNodes()) - pendingMsgs.clear(); - long topVer; if (locNodeCoord) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c47438e8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 7333020..ece898d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -103,12 +103,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private boolean longSockTimeouts; + /** */ + private int maxMissedClientHbs = TcpDiscoverySpi.DFLT_MAX_MISSED_CLIENT_HEARTBEATS; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); TcpDiscoverySpi disco = new TestTcpDiscoverySpi(); + disco.setMaxMissedClientHeartbeats(maxMissedClientHbs); + if (gridName.startsWith("server")) disco.setIpFinder(IP_FINDER); else if (gridName.startsWith("client")) { @@ -494,6 +499,96 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testClientReconnectTopologyChange1() throws Exception { + maxMissedClientHbs = 100; + + clientsPerSrv = 1; + + startServerNodes(2); + startClientNodes(1); + + checkNodes(2, 1); + + srvLeftLatch = new CountDownLatch(3); + srvFailedLatch = new CountDownLatch(1); + + attachListeners(2, 0); + + Ignite ignite = G.ignite("client-0"); + + TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); + + spi.pauseAll(); + + try { + spi.brakeConnection(); + + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + g.close(); + + spi.resumeAll(); + + assertFalse(srvFailedLatch.await(2000, TimeUnit.MILLISECONDS)); + + assertEquals(1L, srvLeftLatch.getCount()); + + checkNodes(2, 1); + } + finally { + spi.resumeAll(); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectTopologyChange2() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-998"); + + maxMissedClientHbs = 100; + + clientsPerSrv = 1; + + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + srvLeftLatch = new CountDownLatch(2); + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + Ignite ignite = G.ignite("client-0"); + + TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); + + spi.pauseAll(); + + try { + spi.brakeConnection(); + + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + g.close(); + + spi.resumeAll(); + + assertFalse(srvFailedLatch.await(2000, TimeUnit.MILLISECONDS)); + + assertEquals(1L, srvLeftLatch.getCount()); + + checkNodes(1, 1); + } + finally { + spi.resumeAll(); + } + } + + /** + * @throws Exception If failed. + */ public void testGetMissedMessagesOnReconnect() throws Exception { clientsPerSrv = 1; @@ -731,8 +826,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** - * TODO: IGNITE-587. - * * @throws Exception If failed. */ public void testDataExchangeFromClient() throws Exception { @@ -740,6 +833,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @param masterName Node name * @throws Exception If failed. */ private void testDataExchange(String masterName) throws Exception { @@ -890,7 +984,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** - * @param clientIdx Index. + * @param clientIdx Client index. + * @param srvIdx Server index. * @throws Exception In case of error. */ private void setClientRouter(int clientIdx, int srvIdx) throws Exception { @@ -948,6 +1043,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * @param srvCnt Number of server nodes. * @param clientCnt Number of client nodes. + * @throws Exception If failed. */ private void attachListeners(int srvCnt, int clientCnt) throws Exception { if (srvJoinedLatch != null) {